From 0c7d5f11d7187904022ea49b6a76b54d7b280345 Mon Sep 17 00:00:00 2001 From: Cameron Clough Date: Sat, 24 Feb 2024 21:56:28 +0000 Subject: [PATCH] use pyupgrade to update syntax (#1889) --- board/jungle/__init__.py | 3 +- board/jungle/scripts/get_version.py | 2 +- examples/can_bit_transition.py | 2 +- examples/can_unique.py | 6 +- examples/query_fw_versions.py | 3 +- pyproject.toml | 6 +- python/__init__.py | 9 ++- python/constants.py | 4 +- python/dfu.py | 25 ++++---- python/isotp.py | 4 +- python/serial.py | 2 +- python/spi.py | 6 +- python/uds.py | 89 +++++++++++++++-------------- setup.py | 2 - tests/bulk_write_test.py | 4 +- tests/elm_car_simulator.py | 4 +- tests/elm_throughput.py | 2 +- tests/fan/fan_tuning.py | 2 +- tests/get_version.py | 2 +- tests/gmbitbang/recv.py | 3 +- tests/hitl/4_can_loopback.py | 4 +- tests/hitl/helpers.py | 3 +- tests/libpanda/libpanda_py.py | 4 +- tests/message_drop_test.py | 8 +-- tests/read_winusb_descriptors.py | 8 +-- tests/safety/common.py | 28 ++++----- tests/safety/hyundai_common.py | 5 +- tests/safety/test_gm.py | 5 +- tests/safety/test_honda.py | 7 +-- 29 files changed, 122 insertions(+), 130 deletions(-) diff --git a/board/jungle/__init__.py b/board/jungle/__init__.py index 77a6e3c7c..a95ddfa92 100644 --- a/board/jungle/__init__.py +++ b/board/jungle/__init__.py @@ -2,7 +2,6 @@ import os import struct from functools import wraps -from typing import Optional from panda import Panda, PandaDFU from panda.python.constants import McuType @@ -57,7 +56,7 @@ class PandaJungle(Panda): fn = os.path.join(FW_PATH, self._mcu_type.config.app_fn.replace("panda", "panda_jungle")) super().flash(fn=fn, code=code, reconnect=reconnect) - def recover(self, timeout: Optional[int] = 60, reset: bool = True) -> bool: + def recover(self, timeout: int | None = 60, reset: bool = True) -> bool: dfu_serial = self.get_dfu_serial() if reset: diff --git a/board/jungle/scripts/get_version.py b/board/jungle/scripts/get_version.py index ad4a1c426..4fc9d30be 100755 --- a/board/jungle/scripts/get_version.py +++ b/board/jungle/scripts/get_version.py @@ -4,6 +4,6 @@ from panda import PandaJungle if __name__ == "__main__": for p in PandaJungle.list(): pp = PandaJungle(p) - print("%s: %s" % (pp.get_serial()[0], pp.get_version())) + print(f"{pp.get_serial()[0]}: {pp.get_version()}") diff --git a/examples/can_bit_transition.py b/examples/can_bit_transition.py index bc9b68ae8..1e7bad54e 100755 --- a/examples/can_bit_transition.py +++ b/examples/can_bit_transition.py @@ -66,7 +66,7 @@ class Info(): message_id = message_id[2:] # remove leading '0x' else: message_id = hex(int(message_id))[2:] # old message IDs are in decimal - message_id = '%s:%s' % (bus, message_id) + message_id = f'{bus}:{message_id}' data = row[CSV_KEYS[dtype]["data"]] if data.startswith('0x'): diff --git a/examples/can_unique.py b/examples/can_unique.py index bc582f016..05977afb0 100755 --- a/examples/can_unique.py +++ b/examples/can_unique.py @@ -52,7 +52,7 @@ class Info(): def load(self, filename): """Given a CSV file, adds information about message IDs and their values.""" - with open(filename, 'r') as inp: + with open(filename) as inp: reader = csv.reader(inp) header = next(reader, None) if header[0] == 'time': @@ -64,7 +64,7 @@ class Info(): for row in reader: bus = row[2] message_id = hex(int(row[1]))[2:] - message_id = '%s:%s' % (bus, message_id) + message_id = f'{bus}:{message_id}' data = row[3] self.store(message_id, data) @@ -75,7 +75,7 @@ class Info(): message_id = row[1][2:] # remove leading '0x' else: message_id = hex(int(row[1]))[2:] # old message IDs are in decimal - message_id = '%s:%s' % (bus, message_id) + message_id = f'{bus}:{message_id}' if row[1].startswith('0x'): data = row[2][2:] # remove leading '0x' else: diff --git a/examples/query_fw_versions.py b/examples/query_fw_versions.py index 4f4e3fa66..fe70bf96a 100755 --- a/examples/query_fw_versions.py +++ b/examples/query_fw_versions.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 import argparse -from typing import List, Optional from tqdm import tqdm from panda import Panda from panda.python.uds import UdsClient, MessageTimeoutError, NegativeResponseError, InvalidSubAddressError, \ @@ -25,7 +24,7 @@ if __name__ == "__main__": addrs += [0x18da0000 + (i << 8) + 0xf1 for i in range(256)] results = {} - sub_addrs: List[Optional[int]] = [None] + sub_addrs: list[int | None] = [None] if args.sub_addr: if args.sub_addr == "scan": sub_addrs = list(range(0xff + 1)) diff --git a/pyproject.toml b/pyproject.toml index 72bce22be..ee89db28e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,9 +1,11 @@ # https://beta.ruff.rs/docs/configuration/#using-pyprojecttoml [tool.ruff] -select = ["E", "F", "W", "PIE", "C4", "ISC", "RUF100", "A"] -ignore = ["W292", "E741", "E402", "C408", "ISC003"] line-length = 160 target-version="py311" + +[tool.ruff.lint] +select = ["E", "F", "W", "PIE", "C4", "ISC", "RUF100", "A"] +ignore = ["W292", "E741", "E402", "C408", "ISC003"] flake8-implicit-str-concat.allow-multiline=false [tool.pytest.ini_options] diff --git a/python/__init__.py b/python/__init__.py index acf6ea483..6e614775d 100644 --- a/python/__init__.py +++ b/python/__init__.py @@ -9,7 +9,6 @@ import binascii import datetime import logging from functools import wraps, partial -from typing import Optional from itertools import accumulate from .base import BaseHandle @@ -234,7 +233,7 @@ class Panda: FLAG_FORD_LONG_CONTROL = 1 FLAG_FORD_CANFD = 2 - def __init__(self, serial: Optional[str] = None, claim: bool = True, disable_checks: bool = True, can_speed_kbps: int = 500): + def __init__(self, serial: str | None = None, claim: bool = True, disable_checks: bool = True, can_speed_kbps: int = 500): self._connect_serial = serial self._disable_checks = disable_checks @@ -530,7 +529,7 @@ class Panda: if reconnect: self.reconnect() - def recover(self, timeout: Optional[int] = 60, reset: bool = True) -> bool: + def recover(self, timeout: int | None = 60, reset: bool = True) -> bool: dfu_serial = self.get_dfu_serial() if reset: @@ -549,7 +548,7 @@ class Panda: return True @staticmethod - def wait_for_dfu(dfu_serial: Optional[str], timeout: Optional[int] = None) -> bool: + def wait_for_dfu(dfu_serial: str | None, timeout: int | None = None) -> bool: t_start = time.monotonic() dfu_list = PandaDFU.list() while (dfu_serial is None and len(dfu_list) == 0) or (dfu_serial is not None and dfu_serial not in dfu_list): @@ -561,7 +560,7 @@ class Panda: return True @staticmethod - def wait_for_panda(serial: Optional[str], timeout: int) -> bool: + def wait_for_panda(serial: str | None, timeout: int) -> bool: t_start = time.monotonic() serials = Panda.list() while (serial is None and len(serials) == 0) or (serial is not None and serial not in serials): diff --git a/python/constants.py b/python/constants.py index fede52424..8b0959308 100644 --- a/python/constants.py +++ b/python/constants.py @@ -1,6 +1,6 @@ import os import enum -from typing import List, NamedTuple +from typing import NamedTuple BASEDIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../") FW_PATH = os.path.join(BASEDIR, "board/obj/") @@ -10,7 +10,7 @@ USBPACKET_MAX_SIZE = 0x40 class McuConfig(NamedTuple): mcu: str mcu_idcode: int - sector_sizes: List[int] + sector_sizes: list[int] sector_count: int # total sector count, used for MCU identification in DFU mode uid_address: int block_size: int diff --git a/python/dfu.py b/python/dfu.py index 01ff037f8..9beba45e5 100644 --- a/python/dfu.py +++ b/python/dfu.py @@ -2,7 +2,6 @@ import os import usb1 import struct import binascii -from typing import List, Optional from .base import BaseSTBootloaderHandle from .spi import STBootloaderSPIHandle, PandaSpiException @@ -11,9 +10,9 @@ from .constants import FW_PATH, McuType class PandaDFU: - def __init__(self, dfu_serial: Optional[str]): + def __init__(self, dfu_serial: str | None): # try USB, then SPI - handle: Optional[BaseSTBootloaderHandle] + handle: BaseSTBootloaderHandle | None self._context, handle = PandaDFU.usb_connect(dfu_serial) if handle is None: self._context, handle = PandaDFU.spi_connect(dfu_serial) @@ -38,7 +37,7 @@ class PandaDFU: self._context.close() @staticmethod - def usb_connect(dfu_serial: Optional[str]): + def usb_connect(dfu_serial: str | None): handle = None context = usb1.USBContext() context.open() @@ -56,7 +55,7 @@ class PandaDFU: return context, handle @staticmethod - def spi_connect(dfu_serial: Optional[str]): + def spi_connect(dfu_serial: str | None): handle = None this_dfu_serial = None @@ -72,13 +71,7 @@ class PandaDFU: return None, handle @staticmethod - def list() -> List[str]: - ret = PandaDFU.usb_list() - ret += PandaDFU.spi_list() - return list(set(ret)) - - @staticmethod - def usb_list() -> List[str]: + def usb_list() -> list[str]: dfu_serials = [] try: with usb1.USBContext() as context: @@ -93,7 +86,7 @@ class PandaDFU: return dfu_serials @staticmethod - def spi_list() -> List[str]: + def spi_list() -> list[str]: try: _, h = PandaDFU.spi_connect(None) if h is not None: @@ -134,3 +127,9 @@ class PandaDFU: code = f.read() self.program_bootstub(code) self.reset() + + @staticmethod + def list() -> list[str]: + ret = PandaDFU.usb_list() + ret += PandaDFU.spi_list() + return list(set(ret)) diff --git a/python/isotp.py b/python/isotp.py index 3334deb8e..d0bef7d94 100644 --- a/python/isotp.py +++ b/python/isotp.py @@ -79,10 +79,10 @@ def isotp_send(panda, x, addr, bus=0, recvaddr=None, subaddr=None, rate=None): sends = [] while len(x) > 0: if subaddr: - sends.append(((bytes([subaddr, 0x20 + (idx & 0xF)]) + x[0:6]).ljust(8, b"\x00"))) + sends.append((bytes([subaddr, 0x20 + (idx & 0xF)]) + x[0:6]).ljust(8, b"\x00")) x = x[6:] else: - sends.append(((bytes([0x20 + (idx & 0xF)]) + x[0:7]).ljust(8, b"\x00"))) + sends.append((bytes([0x20 + (idx & 0xF)]) + x[0:7]).ljust(8, b"\x00")) x = x[7:] idx += 1 diff --git a/python/serial.py b/python/serial.py index 9ac58862b..c2e965b76 100644 --- a/python/serial.py +++ b/python/serial.py @@ -1,5 +1,5 @@ # mimic a python serial port -class PandaSerial(object): +class PandaSerial: def __init__(self, panda, port, baud): self.panda = panda self.port = port diff --git a/python/spi.py b/python/spi.py index 48dc84d49..d34a61d56 100644 --- a/python/spi.py +++ b/python/spi.py @@ -9,7 +9,7 @@ import logging import threading from contextlib import contextmanager from functools import reduce -from typing import Callable, List, Optional +from collections.abc import Callable from .base import BaseHandle, BaseSTBootloaderHandle, TIMEOUT from .constants import McuType, MCU_TYPE_BY_IDCODE, USBPACKET_MAX_SIZE @@ -341,7 +341,7 @@ class STBootloaderSPIHandle(BaseSTBootloaderHandle): elif data != self.ACK: raise PandaSpiMissingAck - def _cmd_no_retry(self, cmd: int, data: Optional[List[bytes]] = None, read_bytes: int = 0, predata=None) -> bytes: + def _cmd_no_retry(self, cmd: int, data: list[bytes] | None = None, read_bytes: int = 0, predata=None) -> bytes: ret = b"" with self.dev.acquire() as spi: # sync + command @@ -371,7 +371,7 @@ class STBootloaderSPIHandle(BaseSTBootloaderHandle): return bytes(ret) - def _cmd(self, cmd: int, data: Optional[List[bytes]] = None, read_bytes: int = 0, predata=None) -> bytes: + def _cmd(self, cmd: int, data: list[bytes] | None = None, read_bytes: int = 0, predata=None) -> bytes: exc = PandaSpiException() for n in range(MAX_XFER_RETRY_COUNT): try: diff --git a/python/uds.py b/python/uds.py index aaa0697f9..12b4fcdc2 100644 --- a/python/uds.py +++ b/python/uds.py @@ -1,7 +1,8 @@ import time import struct from collections import deque -from typing import Callable, NamedTuple, Tuple, List, Deque, Generator, Optional, cast +from typing import NamedTuple, Deque, cast +from collections.abc import Callable, Generator from enum import IntEnum from functools import partial @@ -300,8 +301,8 @@ def get_dtc_status_names(status): return result class CanClient(): - def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], - tx_addr: int, rx_addr: int, bus: int, sub_addr: Optional[int] = None, debug: bool = False): + def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], list[tuple[int, int, bytes, int]]], + tx_addr: int, rx_addr: int, bus: int, sub_addr: int | None = None, debug: bool = False): self.tx = can_send self.rx = can_recv self.tx_addr = tx_addr @@ -335,7 +336,7 @@ class CanClient(): msgs = self.rx() if drain: if self.debug: - print("CAN-RX: drain - {}".format(len(msgs))) + print(f"CAN-RX: drain - {len(msgs)}") self.rx_buff.clear() else: for rx_addr, _, rx_data, rx_bus in msgs or []: @@ -366,7 +367,7 @@ class CanClient(): except IndexError: pass # empty - def send(self, msgs: List[bytes], delay: float = 0) -> None: + def send(self, msgs: list[bytes], delay: float = 0) -> None: for i, msg in enumerate(msgs): if delay and i != 0: if self.debug: @@ -443,7 +444,7 @@ class IsoTpMessage(): if not setup_only: self._can_client.send([msg]) - def recv(self, timeout=None) -> Tuple[Optional[bytes], bool]: + def recv(self, timeout=None) -> tuple[bytes | None, bool]: if timeout is None: timeout = self.timeout @@ -566,11 +567,11 @@ def get_rx_addr_for_tx_addr(tx_addr, rx_offset=0x8): # standard 29 bit response addr (flip last two bytes) return (tx_addr & 0xFFFF0000) + (tx_addr << 8 & 0xFF00) + (tx_addr >> 8 & 0xFF) - raise ValueError("invalid tx_addr: {}".format(tx_addr)) + raise ValueError(f"invalid tx_addr: {tx_addr}") class UdsClient(): - def __init__(self, panda, tx_addr: int, rx_addr: Optional[int] = None, bus: int = 0, sub_addr: Optional[int] = None, timeout: float = 1, + def __init__(self, panda, tx_addr: int, rx_addr: int | None = None, bus: int = 0, sub_addr: int | None = None, timeout: float = 1, debug: bool = False, tx_timeout: float = 1, response_pending_timeout: float = 10): self.bus = bus self.tx_addr = tx_addr @@ -583,7 +584,7 @@ class UdsClient(): self.response_pending_timeout = response_pending_timeout # generic uds request - def _uds_request(self, service_type: SERVICE_TYPE, subfunction: Optional[int] = None, data: Optional[bytes] = None) -> bytes: + def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int | None = None, data: bytes | None = None) -> bytes: req = bytes([service_type]) if subfunction is not None: req += bytes([subfunction]) @@ -623,12 +624,12 @@ class UdsClient(): if self.debug: print("UDS-RX: response pending") continue - raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code) + raise NegativeResponseError(f'{service_desc} - {error_desc}', service_id, error_code) # positive response if service_type + 0x40 != resp_sid: resp_sid_hex = hex(resp_sid) if resp_sid is not None else None - raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex)) + raise InvalidServiceIdError(f'invalid response service id: {resp_sid_hex}') if subfunction is not None: resp_sfn = resp[1] if len(resp) > 1 else None @@ -671,7 +672,7 @@ class UdsClient(): def tester_present(self, ): self._uds_request(SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00) - def access_timing_parameter(self, timing_parameter_type: TIMING_PARAMETER_TYPE, parameter_values: Optional[bytes] = None): + def access_timing_parameter(self, timing_parameter_type: TIMING_PARAMETER_TYPE, parameter_values: bytes | None = None): write_custom_values = timing_parameter_type == TIMING_PARAMETER_TYPE.SET_TO_GIVEN_VALUES read_values = (timing_parameter_type == TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or timing_parameter_type == TIMING_PARAMETER_TYPE.READ_EXTENDED_SET) @@ -714,8 +715,8 @@ class UdsClient(): "data": resp[2:], # TODO: parse the reset of response } - def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: Optional[BAUD_RATE_TYPE] = None): - data: Optional[bytes] + def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: BAUD_RATE_TYPE | None = None): + data: bytes | None if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE: # baud_rate_type = BAUD_RATE_TYPE @@ -733,21 +734,21 @@ class UdsClient(): resp = self._uds_request(SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None if resp_id != data_identifier_type: - raise ValueError('invalid response data identifier: {} expected: {}'.format(hex(resp_id), hex(data_identifier_type))) + raise ValueError(f'invalid response data identifier: {hex(resp_id)} expected: {hex(data_identifier_type)}') return resp[2:] def read_memory_by_address(self, memory_address: int, memory_size: int, memory_address_bytes: int = 4, memory_size_bytes: int = 1): if memory_address_bytes < 1 or memory_address_bytes > 4: - raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) + raise ValueError(f'invalid memory_address_bytes: {memory_address_bytes}') if memory_size_bytes < 1 or memory_size_bytes > 4: - raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) + raise ValueError(f'invalid memory_size_bytes: {memory_size_bytes}') data = bytes([memory_size_bytes << 4 | memory_address_bytes]) if memory_address >= 1 << (memory_address_bytes * 8): - raise ValueError('invalid memory_address: {}'.format(memory_address)) + raise ValueError(f'invalid memory_address: {memory_address}') data += struct.pack('!I', memory_address)[4 - memory_address_bytes:] if memory_size >= 1 << (memory_size_bytes * 8): - raise ValueError('invalid memory_size: {}'.format(memory_size)) + raise ValueError(f'invalid memory_size: {memory_size}') data += struct.pack('!I', memory_size)[4 - memory_size_bytes:] resp = self._uds_request(SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data) @@ -758,7 +759,7 @@ class UdsClient(): resp = self._uds_request(SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None if resp_id != data_identifier_type: - raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) + raise ValueError(f'invalid response data identifier: {hex(resp_id)}') return resp[2:] # TODO: parse the response def read_data_by_periodic_identifier(self, transmission_mode_type: TRANSMISSION_MODE_TYPE, periodic_data_identifier: int): @@ -767,11 +768,11 @@ class UdsClient(): self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data) def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int, - source_definitions: List[DynamicSourceDefinition], memory_address_bytes: int = 4, memory_size_bytes: int = 1): + source_definitions: list[DynamicSourceDefinition], memory_address_bytes: int = 4, memory_size_bytes: int = 1): if memory_address_bytes < 1 or memory_address_bytes > 4: - raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) + raise ValueError(f'invalid memory_address_bytes: {memory_address_bytes}') if memory_size_bytes < 1 or memory_size_bytes > 4: - raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) + raise ValueError(f'invalid memory_size_bytes: {memory_size_bytes}') data = struct.pack('!H', dynamic_data_identifier) if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER: @@ -781,15 +782,15 @@ class UdsClient(): data += bytes([memory_size_bytes << 4 | memory_address_bytes]) for s in source_definitions: if s.memory_address >= 1 << (memory_address_bytes * 8): - raise ValueError('invalid memory_address: {}'.format(s.memory_address)) + raise ValueError(f'invalid memory_address: {s.memory_address}') data += struct.pack('!I', s.memory_address)[4 - memory_address_bytes:] if s.memory_size >= 1 << (memory_size_bytes * 8): - raise ValueError('invalid memory_size: {}'.format(s.memory_size)) + raise ValueError(f'invalid memory_size: {s.memory_size}') data += struct.pack('!I', s.memory_size)[4 - memory_size_bytes:] elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER: pass else: - raise ValueError('invalid dynamic identifier type: {}'.format(hex(dynamic_definition_type))) + raise ValueError(f'invalid dynamic identifier type: {hex(dynamic_definition_type)}') self._uds_request(SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data) def write_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, data_record: bytes): @@ -797,20 +798,20 @@ class UdsClient(): resp = self._uds_request(SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None if resp_id != data_identifier_type: - raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) + raise ValueError(f'invalid response data identifier: {hex(resp_id)}') def write_memory_by_address(self, memory_address: int, memory_size: int, data_record: bytes, memory_address_bytes: int = 4, memory_size_bytes: int = 1): if memory_address_bytes < 1 or memory_address_bytes > 4: - raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) + raise ValueError(f'invalid memory_address_bytes: {memory_address_bytes}') if memory_size_bytes < 1 or memory_size_bytes > 4: - raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) + raise ValueError(f'invalid memory_size_bytes: {memory_size_bytes}') data = bytes([memory_size_bytes << 4 | memory_address_bytes]) if memory_address >= 1 << (memory_address_bytes * 8): - raise ValueError('invalid memory_address: {}'.format(memory_address)) + raise ValueError(f'invalid memory_address: {memory_address}') data += struct.pack('!I', memory_address)[4 - memory_address_bytes:] if memory_size >= 1 << (memory_size_bytes * 8): - raise ValueError('invalid memory_size: {}'.format(memory_size)) + raise ValueError(f'invalid memory_size: {memory_size}') data += struct.pack('!I', memory_size)[4 - memory_size_bytes:] data += data_record @@ -864,7 +865,7 @@ class UdsClient(): resp = self._uds_request(SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None if resp_id != data_identifier_type: - raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) + raise ValueError(f'invalid response data identifier: {hex(resp_id)}') return resp[2:] def routine_control(self, routine_control_type: ROUTINE_CONTROL_TYPE, routine_identifier_type: ROUTINE_IDENTIFIER_TYPE, routine_option_record: bytes = b''): @@ -872,23 +873,23 @@ class UdsClient(): resp = self._uds_request(SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None if resp_id != routine_identifier_type: - raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id))) + raise ValueError(f'invalid response routine identifier: {hex(resp_id)}') return resp[2:] def request_download(self, memory_address: int, memory_size: int, memory_address_bytes: int = 4, memory_size_bytes: int = 4, data_format: int = 0x00): data = bytes([data_format]) if memory_address_bytes < 1 or memory_address_bytes > 4: - raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) + raise ValueError(f'invalid memory_address_bytes: {memory_address_bytes}') if memory_size_bytes < 1 or memory_size_bytes > 4: - raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) + raise ValueError(f'invalid memory_size_bytes: {memory_size_bytes}') data += bytes([memory_size_bytes << 4 | memory_address_bytes]) if memory_address >= 1 << (memory_address_bytes * 8): - raise ValueError('invalid memory_address: {}'.format(memory_address)) + raise ValueError(f'invalid memory_address: {memory_address}') data += struct.pack('!I', memory_address)[4 - memory_address_bytes:] if memory_size >= 1 << (memory_size_bytes * 8): - raise ValueError('invalid memory_size: {}'.format(memory_size)) + raise ValueError(f'invalid memory_size: {memory_size}') data += struct.pack('!I', memory_size)[4 - memory_size_bytes:] resp = self._uds_request(SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data) @@ -896,7 +897,7 @@ class UdsClient(): if max_num_bytes_len >= 1 and max_num_bytes_len <= 4: max_num_bytes = struct.unpack('!I', (b"\x00" * (4 - max_num_bytes_len)) + resp[1:max_num_bytes_len + 1])[0] else: - raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len)) + raise ValueError(f'invalid max_num_bytes_len: {max_num_bytes_len}') return max_num_bytes # max number of bytes per transfer data request @@ -904,16 +905,16 @@ class UdsClient(): data = bytes([data_format]) if memory_address_bytes < 1 or memory_address_bytes > 4: - raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) + raise ValueError(f'invalid memory_address_bytes: {memory_address_bytes}') if memory_size_bytes < 1 or memory_size_bytes > 4: - raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes)) + raise ValueError(f'invalid memory_size_bytes: {memory_size_bytes}') data += bytes([memory_size_bytes << 4 | memory_address_bytes]) if memory_address >= 1 << (memory_address_bytes * 8): - raise ValueError('invalid memory_address: {}'.format(memory_address)) + raise ValueError(f'invalid memory_address: {memory_address}') data += struct.pack('!I', memory_address)[4 - memory_address_bytes:] if memory_size >= 1 << (memory_size_bytes * 8): - raise ValueError('invalid memory_size: {}'.format(memory_size)) + raise ValueError(f'invalid memory_size: {memory_size}') data += struct.pack('!I', memory_size)[4 - memory_size_bytes:] resp = self._uds_request(SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data) @@ -921,7 +922,7 @@ class UdsClient(): if max_num_bytes_len >= 1 and max_num_bytes_len <= 4: max_num_bytes = struct.unpack('!I', (b"\x00" * (4 - max_num_bytes_len)) + resp[1:max_num_bytes_len + 1])[0] else: - raise ValueError('invalid max_num_bytes_len: {}'.format(max_num_bytes_len)) + raise ValueError(f'invalid max_num_bytes_len: {max_num_bytes_len}') return max_num_bytes # max number of bytes per transfer data request @@ -930,7 +931,7 @@ class UdsClient(): resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data) resp_id = resp[0] if len(resp) > 0 else None if resp_id != block_sequence_count: - raise ValueError('invalid block_sequence_count: {}'.format(resp_id)) + raise ValueError(f'invalid block_sequence_count: {resp_id}') return resp[1:] def request_transfer_exit(self): diff --git a/setup.py b/setup.py index c419056a9..25ee66923 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - """ Panda CAN Controller Dongle ~~~~~ diff --git a/tests/bulk_write_test.py b/tests/bulk_write_test.py index bc5bad331..278766a19 100755 --- a/tests/bulk_write_test.py +++ b/tests/bulk_write_test.py @@ -2,7 +2,7 @@ import os import time import threading -from typing import Any, List +from typing import Any from panda import Panda @@ -38,7 +38,7 @@ if __name__ == "__main__": threading.Thread(target=flood_tx, args=(sender,)).start() # Receive as much as we can in a few second time period - rx: List[Any] = [] + rx: list[Any] = [] old_len = 0 start_time = time.time() while time.time() - start_time < 3 or len(rx) > old_len: diff --git a/tests/elm_car_simulator.py b/tests/elm_car_simulator.py index 119c6b682..56e825f5d 100755 --- a/tests/elm_car_simulator.py +++ b/tests/elm_car_simulator.py @@ -190,7 +190,7 @@ class ELMCarSimulator(): if pid == 0x02: # Show VIN return b"1D4GP00R55B123456" if pid == 0xFC: # test long multi message. Ligned up for LIN responses - return b''.join((struct.pack(">BBH", 0xAA, 0xAA, num + 1) for num in range(80))) + return b''.join(struct.pack(">BBH", 0xAA, 0xAA, num + 1) for num in range(80)) if pid == 0xFD: # test long multi message parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(80)) return b'\xAA\xAA\xAA' + b''.join(parts) @@ -199,7 +199,7 @@ class ELMCarSimulator(): return b'\xAA\xAA\xAA' + b''.join(parts) + b'\xAA' if pid == 0xFF: return b'\xAA\x00\x00' + \ - b"".join(((b'\xAA' * 5) + struct.pack(">H", num + 1) for num in range(584))) + b"".join((b'\xAA' * 5) + struct.pack(">H", num + 1) for num in range(584)) #return b"\xAA"*100#(0xFFF-3) diff --git a/tests/elm_throughput.py b/tests/elm_throughput.py index 75fb1c652..983d4a141 100755 --- a/tests/elm_throughput.py +++ b/tests/elm_throughput.py @@ -6,7 +6,7 @@ import select class Reader(threading.Thread): def __init__(self, s, *args, **kwargs): - super(Reader, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) self._s = s self.__stop = False diff --git a/tests/fan/fan_tuning.py b/tests/fan/fan_tuning.py index e9158d609..2bdfab79a 100755 --- a/tests/fan/fan_tuning.py +++ b/tests/fan/fan_tuning.py @@ -33,7 +33,7 @@ def logger(event): for l in drain_serial(p)[::-1]: ns = l.decode('utf8').strip().split(' ') if len(ns) == 4: - target_rpm, rpm_fast, power, stall_count = [int(n, 16) for n in ns] + target_rpm, rpm_fast, power, stall_count = (int(n, 16) for n in ns) break dat = { diff --git a/tests/get_version.py b/tests/get_version.py index 116d8abec..a0138122b 100755 --- a/tests/get_version.py +++ b/tests/get_version.py @@ -4,4 +4,4 @@ from panda import Panda if __name__ == "__main__": for p in Panda.list(): pp = Panda(p) - print("%s: %s" % (pp.get_serial()[0], pp.get_version())) + print(f"{pp.get_serial()[0]}: {pp.get_version()}") diff --git a/tests/gmbitbang/recv.py b/tests/gmbitbang/recv.py index 73285e7e5..8dc594de4 100755 --- a/tests/gmbitbang/recv.py +++ b/tests/gmbitbang/recv.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 -from typing import Optional from panda import Panda @@ -8,7 +7,7 @@ if __name__ == "__main__": p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) p.set_gmlan(bus=2) #p.can_send(0xaaa, b"\x00\x00", bus=3) - last_add: Optional[int] = None + last_add: int | None = None while True: ret = p.can_recv() if len(ret) > 0: diff --git a/tests/hitl/4_can_loopback.py b/tests/hitl/4_can_loopback.py index f00bc9547..a7e1aea1e 100644 --- a/tests/hitl/4_can_loopback.py +++ b/tests/hitl/4_can_loopback.py @@ -24,7 +24,7 @@ def test_send_recv(p, panda_jungle): saturation_pct = (comp_kbps / speed) * 100.0 assert 80 < saturation_pct < 100 - print("two pandas bus {}, 100 messages at speed {:4d}, comp speed is {:7.2f}, {:6.2f}%".format(bus, speed, comp_kbps, saturation_pct)) + print(f"two pandas bus {bus}, 100 messages at speed {speed:4d}, comp speed is {comp_kbps:7.2f}, {saturation_pct:6.2f}%") # Run tests in both directions p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) @@ -59,7 +59,7 @@ def test_latency(p, panda_jungle): r_echo = p_send.can_recv() if len(r) == 0 or len(r_echo) == 0: - print("r: {}, r_echo: {}".format(r, r_echo)) + print(f"r: {r}, r_echo: {r_echo}") assert len(r) == 1 assert len(r_echo) == 1 diff --git a/tests/hitl/helpers.py b/tests/hitl/helpers.py index d7ac4da0d..4eee43731 100644 --- a/tests/hitl/helpers.py +++ b/tests/hitl/helpers.py @@ -1,6 +1,5 @@ import time import random -from typing import Optional def get_random_can_messages(n): @@ -52,7 +51,7 @@ def time_many_sends(p, bus, p_recv=None, msg_count=100, two_pandas=False, msg_le return comp_kbps -def clear_can_buffers(panda, speed: Optional[int] = None): +def clear_can_buffers(panda, speed: int | None = None): if speed is not None: for bus in range(3): panda.set_can_speed_kbps(bus, speed) diff --git a/tests/libpanda/libpanda_py.py b/tests/libpanda/libpanda_py.py index 1b07543d6..8f876c551 100644 --- a/tests/libpanda/libpanda_py.py +++ b/tests/libpanda/libpanda_py.py @@ -1,6 +1,6 @@ import os from cffi import FFI -from typing import Any, List, Protocol +from typing import Any, Protocol from panda import LEN_TO_DLC from panda.tests.libpanda.safety_helpers import PandaSafety, setup_safety_helpers @@ -64,7 +64,7 @@ class CANPacket: returned: int extended: int addr: int - data: List[int] + data: list[int] class Panda(PandaSafety, Protocol): # CAN diff --git a/tests/message_drop_test.py b/tests/message_drop_test.py index 99d35b043..bf485e454 100755 --- a/tests/message_drop_test.py +++ b/tests/message_drop_test.py @@ -5,7 +5,7 @@ import time import struct import itertools import threading -from typing import Any, Union, List +from typing import Any from panda import Panda @@ -16,7 +16,7 @@ if JUNGLE: # Generate unique messages NUM_MESSAGES_PER_BUS = 10000 messages = [bytes(struct.pack("Q", i)) for i in range(NUM_MESSAGES_PER_BUS)] -tx_messages = list(itertools.chain.from_iterable(([[0xaa, None, msg, 0], [0xaa, None, msg, 1], [0xaa, None, msg, 2]] for msg in messages))) +tx_messages = list(itertools.chain.from_iterable([[0xaa, None, msg, 0], [0xaa, None, msg, 1], [0xaa, None, msg, 2]] for msg in messages)) def flood_tx(panda): print('Sending!') @@ -35,7 +35,7 @@ def flood_tx(panda): if __name__ == "__main__": serials = Panda.list() - receiver: Union[Panda, PandaJungle] + receiver: Panda | PandaJungle if JUNGLE: sender = Panda() receiver = PandaJungle() @@ -52,7 +52,7 @@ if __name__ == "__main__": threading.Thread(target=flood_tx, args=(sender,)).start() # Receive as much as we can, and stop when there hasn't been anything for a second - rx: List[Any] = [] + rx: list[Any] = [] old_len = 0 last_change = time.monotonic() while time.monotonic() - last_change < 1: diff --git a/tests/read_winusb_descriptors.py b/tests/read_winusb_descriptors.py index ec0534eba..5d311c9ea 100755 --- a/tests/read_winusb_descriptors.py +++ b/tests/read_winusb_descriptors.py @@ -12,23 +12,23 @@ if __name__ == "__main__": print('Microsoft OS String Descriptor') dat = p._handle.controlRead(Panda.REQUEST_IN, 0x06, 3 << 8 | 238, 0, length[0]) if DEBUG: - print('LEN: {}'.format(hex(length[0]))) + print(f'LEN: {hex(length[0])}') hexdump("".join(map(chr, dat))) ms_vendor_code = dat[16] if DEBUG: - print('MS_VENDOR_CODE: {}'.format(hex(length[0]))) + print(f'MS_VENDOR_CODE: {hex(length[0])}') print('\nMicrosoft Compatible ID Feature Descriptor') length = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 4, 1) if DEBUG: - print('LEN: {}'.format(hex(length[0]))) + print(f'LEN: {hex(length[0])}') dat = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 4, length[0]) hexdump("".join(map(chr, dat))) print('\nMicrosoft Extended Properties Feature Descriptor') length = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 5, 1) if DEBUG: - print('LEN: {}'.format(hex(length[0]))) + print(f'LEN: {hex(length[0])}') dat = p._handle.controlRead(Panda.REQUEST_IN, ms_vendor_code, 0, 5, length[0]) hexdump("".join(map(chr, dat))) diff --git a/tests/safety/common.py b/tests/safety/common.py index 8b70966ae..a3c22dffb 100644 --- a/tests/safety/common.py +++ b/tests/safety/common.py @@ -3,7 +3,7 @@ import abc import unittest import importlib import numpy as np -from typing import Callable, Dict, List, Optional, Tuple +from collections.abc import Callable from opendbc.can.packer import CANPacker # pylint: disable=import-error from panda import ALTERNATIVE_EXPERIENCE @@ -77,7 +77,7 @@ class PandaSafetyTestBase(unittest.TestCase): def _generic_limit_safety_check(self, msg_function: MessageFunction, min_allowed_value: float, max_allowed_value: float, min_possible_value: float, max_possible_value: float, test_delta: float = 1, inactive_value: float = 0, - msg_allowed = True, additional_setup: Optional[Callable[[float], None]] = None): + msg_allowed = True, additional_setup: Callable[[float], None] | None = None): """ Enforces that a signal within a message is only allowed to be sent within a specific range, min_allowed_value -> max_allowed_value. Tests the range of min_possible_value -> max_possible_value with a delta of test_delta. @@ -245,13 +245,13 @@ class LongitudinalAccelSafetyTest(PandaSafetyTestBase, abc.ABC): class LongitudinalGasBrakeSafetyTest(PandaSafetyTestBase, abc.ABC): MIN_BRAKE: int = 0 - MAX_BRAKE: Optional[int] = None - MAX_POSSIBLE_BRAKE: Optional[int] = None + MAX_BRAKE: int | None = None + MAX_POSSIBLE_BRAKE: int | None = None MIN_GAS: int = 0 - MAX_GAS: Optional[int] = None + MAX_GAS: int | None = None INACTIVE_GAS = 0 - MAX_POSSIBLE_GAS: Optional[int] = None + MAX_POSSIBLE_GAS: int | None = None def test_gas_brake_limits_correct(self): self.assertIsNotNone(self.MAX_POSSIBLE_BRAKE) @@ -665,9 +665,9 @@ class MotorTorqueSteeringSafetyTest(TorqueSteeringSafetyTestBase, abc.ABC): class AngleSteeringSafetyTest(PandaSafetyTestBase): DEG_TO_CAN: float - ANGLE_RATE_BP: List[float] - ANGLE_RATE_UP: List[float] # windup limit - ANGLE_RATE_DOWN: List[float] # unwind limit + ANGLE_RATE_BP: list[float] + ANGLE_RATE_UP: list[float] # windup limit + ANGLE_RATE_DOWN: list[float] # unwind limit @classmethod def setUpClass(cls): @@ -771,14 +771,14 @@ class AngleSteeringSafetyTest(PandaSafetyTestBase): class PandaSafetyTest(PandaSafetyTestBase): - TX_MSGS: Optional[List[List[int]]] = None + TX_MSGS: list[list[int]] | None = None SCANNED_ADDRS = [*range(0x800), # Entire 11-bit CAN address space *range(0x18DA00F1, 0x18DB00F1, 0x100), # 29-bit UDS physical addressing *range(0x18DB00F1, 0x18DC00F1, 0x100), # 29-bit UDS functional addressing *range(0x3300, 0x3400), # Honda 0x10400060, 0x104c006c] # GMLAN (exceptions, range/format unclear) - FWD_BLACKLISTED_ADDRS: Dict[int, List[int]] = {} # {bus: [addr]} - FWD_BUS_LOOKUP: Dict[int, int] = {} + FWD_BLACKLISTED_ADDRS: dict[int, list[int]] = {} # {bus: [addr]} + FWD_BUS_LOOKUP: dict[int, int] = {} @classmethod def setUpClass(cls): @@ -895,9 +895,9 @@ class PandaSafetyTest(PandaSafetyTestBase): @add_regen_tests class PandaCarSafetyTest(PandaSafetyTest): - STANDSTILL_THRESHOLD: Optional[float] = None + STANDSTILL_THRESHOLD: float | None = None GAS_PRESSED_THRESHOLD = 0 - RELAY_MALFUNCTION_ADDRS: Optional[Dict[int, Tuple[int, ...]]] = None + RELAY_MALFUNCTION_ADDRS: dict[int, tuple[int, ...]] | None = None @classmethod def setUpClass(cls): diff --git a/tests/safety/hyundai_common.py b/tests/safety/hyundai_common.py index 3fd36a0e1..da18671af 100644 --- a/tests/safety/hyundai_common.py +++ b/tests/safety/hyundai_common.py @@ -1,4 +1,3 @@ -from typing import Tuple import unittest import panda.tests.safety.common as common @@ -77,8 +76,8 @@ class HyundaiButtonBase: class HyundaiLongitudinalBase(common.LongitudinalAccelSafetyTest): # pylint: disable=no-member,abstract-method - DISABLED_ECU_UDS_MSG: Tuple[int, int] - DISABLED_ECU_ACTUATION_MSG: Tuple[int, int] + DISABLED_ECU_UDS_MSG: tuple[int, int] + DISABLED_ECU_ACTUATION_MSG: tuple[int, int] @classmethod def setUpClass(cls): diff --git a/tests/safety/test_gm.py b/tests/safety/test_gm.py index 587aa3f1c..28b2ad50e 100755 --- a/tests/safety/test_gm.py +++ b/tests/safety/test_gm.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 import unittest -from typing import Dict, List from panda import Panda from panda.tests.libpanda import libpanda_py import panda.tests.safety.common as common @@ -147,8 +146,8 @@ class TestGmAscmSafety(GmLongitudinalBase, TestGmSafetyBase): [0xA1, 1], [0x306, 1], [0x308, 1], [0x310, 1], # obs bus [0x315, 2], # ch bus [0x104c006c, 3], [0x10400060, 3]] # gmlan - FWD_BLACKLISTED_ADDRS: Dict[int, List[int]] = {} - FWD_BUS_LOOKUP: Dict[int, int] = {} + FWD_BLACKLISTED_ADDRS: dict[int, list[int]] = {} + FWD_BUS_LOOKUP: dict[int, int] = {} BRAKE_BUS = 2 MAX_GAS = 3072 diff --git a/tests/safety/test_honda.py b/tests/safety/test_honda.py index 08d069b5e..45f190c36 100755 --- a/tests/safety/test_honda.py +++ b/tests/safety/test_honda.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import unittest import numpy as np -from typing import Optional from panda import Panda from panda.tests.libpanda import libpanda_py @@ -170,9 +169,9 @@ class HondaPcmEnableBase(common.PandaCarSafetyTest): class HondaBase(common.PandaCarSafetyTest): MAX_BRAKE = 255 - PT_BUS: Optional[int] = None # must be set when inherited - STEER_BUS: Optional[int] = None # must be set when inherited - BUTTONS_BUS: Optional[int] = None # must be set when inherited, tx on this bus, rx on PT_BUS + PT_BUS: int | None = None # must be set when inherited + STEER_BUS: int | None = None # must be set when inherited + BUTTONS_BUS: int | None = None # must be set when inherited, tx on this bus, rx on PT_BUS STANDSTILL_THRESHOLD = 0 RELAY_MALFUNCTION_ADDRS = {0: (0xE4, 0x194)} # STEERING_CONTROL