From 17450b277dadde7c53b7100601f7c9db40ee5fc3 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Sat, 11 Feb 2023 12:25:13 -0800 Subject: [PATCH] Same mypy checks as openpilot (#1233) * no specific revision for mypy * bump to ~OP version * same warnings as openpilot * ignore * fix * rm that * switch to ignore so mypy lets us know when it's fixed --------- Co-authored-by: Adeeb Shihadeh --- mypy.ini | 11 +++++++++ python/ccp.py | 12 +++++----- python/spi_dfu.py | 4 ++-- tests/gmbitbang/recv.py | 29 +++++++++++++---------- tests/safety_replay/test_safety_replay.py | 2 +- 5 files changed, 36 insertions(+), 22 deletions(-) create mode 100644 mypy.ini diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 000000000..0b6de30d8 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,11 @@ +[mypy] +; third-party packages +ignore_missing_imports = True + +; helpful warnings +warn_redundant_casts = True +warn_unreachable = True +warn_unused_ignores = True + +; restrict dynamic typing +warn_return_any = True diff --git a/python/ccp.py b/python/ccp.py index a8c00169f..918312002 100644 --- a/python/ccp.py +++ b/python/ccp.py @@ -100,9 +100,9 @@ class CcpClient(): msgs = self._panda.can_recv() or [] if len(msgs) >= 256: print("CAN RX buffer overflow!!!", file=sys.stderr) - for rx_addr, _, rx_data, rx_bus in msgs: + for rx_addr, _, rx_data_bytearray, rx_bus in msgs: if rx_bus == self.can_bus and rx_addr == self.rx_addr: - rx_data = bytes(rx_data) # convert bytearray to bytes + rx_data = bytes(rx_data_bytearray) if self.debug: print(f"CAN-RX: {hex(rx_addr)} - 0x{bytes.hex(rx_data)}") assert len(rx_data) == 8, f"message length not 8: {len(rx_data)}" @@ -183,7 +183,7 @@ class CcpClient(): resp = self._recv_dto(0.025) # mta_addr_ext = resp[0] mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0] - return mta_addr + return mta_addr # type: ignore def download_6_bytes(self, data: bytes) -> int: if len(data) != 6: @@ -192,7 +192,7 @@ class CcpClient(): resp = self._recv_dto(0.025) # mta_addr_ext = resp[0] mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0] - return mta_addr + return mta_addr # type: ignore def upload(self, size: int) -> bytes: if size > 5: @@ -296,7 +296,7 @@ class CcpClient(): resp = self._recv_dto(0.1) # mta_addr_ext = resp[0] mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0] - return mta_addr + return mta_addr # type: ignore def program_6_bytes(self, data: bytes) -> int: if len(data) != 6: @@ -305,7 +305,7 @@ class CcpClient(): resp = self._recv_dto(0.1) # mta_addr_ext = resp[0] mta_addr = struct.unpack(f"{self.byte_order.value}I", resp[1:5])[0] - return mta_addr + return mta_addr # type: ignore def move_memory_block(self, size: int) -> None: self._send_cro(COMMAND_CODE.MOVE, struct.pack(f"{self.byte_order.value}I", size)) diff --git a/python/spi_dfu.py b/python/spi_dfu.py index 2f1ec5d2a..73d930528 100644 --- a/python/spi_dfu.py +++ b/python/spi_dfu.py @@ -37,7 +37,8 @@ class PandaSpiDFU: elif data != ACK: raise Exception("Missing ACK") - def _cmd(self, cmd, data=None, read_bytes=0): + def _cmd(self, cmd, data=None, read_bytes=0) -> bytes: + ret = b"" with self.dev.acquire() as spi: # sync spi.xfer([SYNC, ]) @@ -53,7 +54,6 @@ class PandaSpiDFU: self._get_ack(spi, timeout=20) # receive - ret = None if read_bytes > 0: # send busy byte ret = spi.xfer([0x00, ]*(read_bytes + 1))[1:] diff --git a/tests/gmbitbang/recv.py b/tests/gmbitbang/recv.py index 3949c424d..73285e7e5 100755 --- a/tests/gmbitbang/recv.py +++ b/tests/gmbitbang/recv.py @@ -1,16 +1,19 @@ #!/usr/bin/env python3 +from typing import Optional + from panda import Panda -p = Panda() -p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) -p.set_gmlan(bus=2) -#p.can_send(0xaaa, b"\x00\x00", bus=3) -last_add = None -while 1: - ret = p.can_recv() - if len(ret) > 0: - add = ret[0][0] - if last_add is not None and add != last_add + 1: - print("MISS: ", last_add, add) - last_add = add - print(ret) +if __name__ == "__main__": + p = Panda() + p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + p.set_gmlan(bus=2) + #p.can_send(0xaaa, b"\x00\x00", bus=3) + last_add: Optional[int] = None + while True: + ret = p.can_recv() + if len(ret) > 0: + add = ret[0][0] + if last_add is not None and add != last_add + 1: + print("MISS: ", last_add, add) + last_add = add + print(ret) diff --git a/tests/safety_replay/test_safety_replay.py b/tests/safety_replay/test_safety_replay.py index 697ebc497..dc61eca4f 100755 --- a/tests/safety_replay/test_safety_replay.py +++ b/tests/safety_replay/test_safety_replay.py @@ -53,6 +53,6 @@ if __name__ == "__main__": if not replay_drive(lr, mode, param, alt_exp): failed.append(route) - for f in failed: # type: ignore + for f in failed: print(f"\n**** failed on {f} ****") assert len(failed) == 0, "\nfailed on %d logs" % len(failed)