add mypy check for return-any (#24379)

* add mypy check for return-any

* remove unused import

* typing

* remove unnecessary variable typing

* cleanup

* cleanup

* bump submodules

* small fixes

* only a problem on mac

Co-authored-by: Willem Melching <willem.melching@gmail.com>
This commit is contained in:
Dylan Herman 2022-05-12 07:59:00 -05:00 committed by GitHub
parent 3bfe4a691c
commit d2eef1955c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 64 additions and 49 deletions

View File

@ -23,6 +23,7 @@ repos:
additional_dependencies: ['lxml', 'numpy', 'types-atomicwrites', 'types-pycurl', 'types-requests', 'types-certifi']
args:
- --warn-redundant-casts
- --warn-return-any
- --warn-unreachable
- --warn-unused-ignores
#- --html-report=/home/batman/openpilot

2
cereal

@ -1 +1 @@
Subproject commit 67e5c5ca378fcf6db53e968e5ad9e4a98d32ed0f
Subproject commit 8e8831f8e03deb62dd5fcfc638e134cdc8e9adbf

View File

@ -36,12 +36,12 @@ class Priority:
def set_realtime_priority(level: int) -> None:
if not PC:
os.sched_setscheduler(0, os.SCHED_FIFO, os.sched_param(level)) # type: ignore[attr-defined]
os.sched_setscheduler(0, os.SCHED_FIFO, os.sched_param(level)) # type: ignore[attr-defined] # pylint: disable=no-member
def set_core_affinity(cores: List[int]) -> None:
if not PC:
os.sched_setaffinity(0, cores)
os.sched_setaffinity(0, cores) # pylint: disable=no-member
def config_realtime_process(cores: Union[int, List[int]], priority: int) -> None:

View File

@ -1,5 +1,6 @@
import os
from cffi import FFI
from typing import Any, List
# Workaround for the EON/termux build of Python having os.*xattr removed.
ffi = FFI()
@ -11,7 +12,7 @@ int removexattr(const char *path, const char *name);
""")
libc = ffi.dlopen(None)
def setxattr(path, name, value, flags=0):
def setxattr(path, name, value, flags=0) -> None:
path = path.encode()
name = name.encode()
if libc.setxattr(path, name, value, len(value), flags) == -1:
@ -29,7 +30,7 @@ def getxattr(path, name, size=128):
raise OSError(ffi.errno, f"{os.strerror(ffi.errno)}: getxattr({path}, {name}, {size})")
return ffi.buffer(value)[:l]
def listxattr(path, size=128):
def listxattr(path, size=128) -> List[Any]:
path = path.encode()
attrs = ffi.new(f"char[{size}]")
l = libc.listxattr(path, attrs, size)
@ -38,7 +39,7 @@ def listxattr(path, size=128):
# attrs is b'\0' delimited values (so chop off trailing empty item)
return [a.decode() for a in ffi.buffer(attrs)[:l].split(b"\0")[0:-1]]
def removexattr(path, name):
def removexattr(path, name) -> None:
path = path.encode()
name = name.encode()
if libc.removexattr(path, name) == -1:

2
panda

@ -1 +1 @@
Subproject commit eb662e4e5014a3fc3c04512d708f61080c7707c1
Subproject commit d5bd81e5b517c79e164d87b96355e6bc75915da0

View File

@ -6,7 +6,7 @@ from multiprocessing import Process
from setproctitle import setproctitle # pylint: disable=no-name-in-module
def waste(core):
os.sched_setaffinity(0, [core,])
os.sched_setaffinity(0, [core,]) # pylint: disable=no-member
m1 = np.zeros((200, 200)) + 0.8
m2 = np.zeros((200, 200)) + 1.2

View File

@ -23,13 +23,13 @@ def is_registered_device() -> bool:
return dongle not in (None, UNREGISTERED_DONGLE_ID)
def register(show_spinner=False) -> str:
def register(show_spinner=False) -> Optional[str]:
params = Params()
params.put("SubscriberInfo", HARDWARE.get_subscriber_info())
IMEI = params.get("IMEI", encoding='utf8')
HardwareSerial = params.get("HardwareSerial", encoding='utf8')
dongle_id = params.get("DongleId", encoding='utf8')
dongle_id: Optional[str] = params.get("DongleId", encoding='utf8')
needs_registration = None in (IMEI, HardwareSerial, dongle_id)
pubkey = Path(PERSIST+"/comma/id_rsa.pub")

View File

@ -4,7 +4,7 @@ import os
import usb1
import time
import subprocess
from typing import NoReturn
from typing import List, NoReturn
from functools import cmp_to_key
from panda import DEFAULT_FW_FN, DEFAULT_H7_FW_FN, MCU_TYPE_H7, Panda, PandaDFU
@ -102,7 +102,7 @@ def main() -> NoReturn:
cloudlog.info(f"{len(panda_serials)} panda(s) found, connecting - {panda_serials}")
# Flash pandas
pandas = []
pandas: List[Panda] = []
for serial in panda_serials:
pandas.append(flash_panda(serial))
@ -119,7 +119,7 @@ def main() -> NoReturn:
# sort pandas to have deterministic order
pandas.sort(key=cmp_to_key(panda_sort_cmp))
panda_serials = list(map(lambda p: p.get_usb_serial(), pandas))
panda_serials = list(map(lambda p: p.get_usb_serial(), pandas)) # type: ignore
# log panda fw versions
params.put("PandaSignatures", b','.join(p.get_signature() for p in pandas))

View File

@ -1,6 +1,7 @@
# functions common among cars
from cereal import car
from common.numpy_fast import clip
from typing import Dict
# kg of standard extra cargo to count for drive, gas, etc...
STD_CARGO_KG = 136.
@ -41,7 +42,7 @@ def scale_tire_stiffness(mass, wheelbase, center_to_front, tire_stiffness_factor
return tire_stiffness_front, tire_stiffness_rear
def dbc_dict(pt_dbc, radar_dbc, chassis_dbc=None, body_dbc=None):
def dbc_dict(pt_dbc, radar_dbc, chassis_dbc=None, body_dbc=None) -> Dict[str, str]:
return {'pt': pt_dbc, 'radar': radar_dbc, 'chassis': chassis_dbc, 'body': body_dbc}

View File

@ -45,7 +45,8 @@ def get_all_car_info() -> List[CarInfo]:
all_car_info.append(_car_info.init(CP, non_tested_cars, ALL_FOOTNOTES))
# Sort cars by make and model + year
return natsorted(all_car_info, key=lambda car: (car.make + car.model).lower())
sorted_cars: List[CarInfo] = natsorted(all_car_info, key=lambda car: (car.make + car.model).lower())
return sorted_cars
def sort_by_tier(all_car_info: List[CarInfo]) -> Dict[Tier, List[CarInfo]]:
@ -65,8 +66,9 @@ def generate_cars_md(all_car_info: List[CarInfo], template_fn: str) -> str:
template = jinja2.Template(f.read(), trim_blocks=True, lstrip_blocks=True)
footnotes = [fn.value.text for fn in ALL_FOOTNOTES]
return template.render(tiers=sort_by_tier(all_car_info), all_car_info=all_car_info,
footnotes=footnotes, Star=Star, Column=Column)
cars_md: str = template.render(tiers=sort_by_tier(all_car_info), all_car_info=all_car_info,
footnotes=footnotes, Star=Star, Column=Column)
return cars_md
if __name__ == "__main__":

View File

@ -41,7 +41,7 @@ class DBC_FILES:
mqb = "vw_mqb_2010" # Used for all cars with MQB-style CAN messaging
DBC = defaultdict(lambda: dbc_dict(DBC_FILES.mqb, None)) # type: Dict[str, Dict[str, str]]
DBC: Dict[str, Dict[str, str]] = defaultdict(lambda: dbc_dict(DBC_FILES.mqb, None))
BUTTON_STATES = {
"accelCruise": False,

View File

@ -135,6 +135,8 @@ class Alert:
return f"{self.alert_text_1}/{self.alert_text_2} {self.priority} {self.visual_alert} {self.audible_alert}"
def __gt__(self, alert2) -> bool:
if not isinstance(alert2, Alert):
return False
return self.priority > alert2.priority

View File

@ -29,22 +29,22 @@ class VehicleModel:
CP: Car Parameters
"""
# for math readability, convert long names car params into short names
self.m = CP.mass
self.j = CP.rotationalInertia
self.l = CP.wheelbase
self.aF = CP.centerToFront
self.aR = CP.wheelbase - CP.centerToFront
self.chi = CP.steerRatioRear
self.m: float = CP.mass
self.j: float = CP.rotationalInertia
self.l: float = CP.wheelbase
self.aF: float = CP.centerToFront
self.aR: float = CP.wheelbase - CP.centerToFront
self.chi: float = CP.steerRatioRear
self.cF_orig = CP.tireStiffnessFront
self.cR_orig = CP.tireStiffnessRear
self.cF_orig: float = CP.tireStiffnessFront
self.cR_orig: float = CP.tireStiffnessRear
self.update_params(1.0, CP.steerRatio)
def update_params(self, stiffness_factor: float, steer_ratio: float) -> None:
"""Update the vehicle model with a new stiffness factor and steer ratio"""
self.cF = stiffness_factor * self.cF_orig
self.cR = stiffness_factor * self.cR_orig
self.sR = steer_ratio
self.cF: float = stiffness_factor * self.cF_orig
self.cR: float = stiffness_factor * self.cR_orig
self.sR: float = steer_ratio
def steady_state_sol(self, sa: float, u: float, roll: float) -> np.ndarray:
"""Returns the steady state solution.
@ -221,10 +221,10 @@ def dyn_ss_sol(sa: float, u: float, roll: float, VM: VehicleModel) -> np.ndarray
"""
A, B = create_dyn_state_matrices(u, VM)
inp = np.array([[sa], [roll]])
return -solve(A, B) @ inp
return -solve(A, B) @ inp # type: ignore
def calc_slip_factor(VM):
def calc_slip_factor(VM: VehicleModel) -> float:
"""The slip factor is a measure of how the curvature changes with speed
it's positive for Oversteering vehicle, negative (usual case) otherwise.
"""

View File

@ -88,7 +88,7 @@ if __name__ == "__main__":
pass
print("Top CPU usage:")
for k, v in sorted(procs.items(), key=lambda item: item[1], reverse=True)[:10]:
for k, v in sorted(procs.items(), key=lambda item: item[1], reverse=True)[:10]: # type: ignore
print(f"{k.rjust(70)} {v:.2f} %")
print()

View File

@ -25,7 +25,7 @@ if __name__ == "__main__":
# Remove message generated by the process under test and merge in the new messages
produces = {o.which() for o in outputs}
inputs = [i for i in inputs if i.which() not in produces]
outputs = sorted(inputs + outputs, key=lambda x: x.logMonoTime)
outputs = sorted(inputs + outputs, key=lambda x: x.logMonoTime) # type: ignore
fn = f"{args.route}_{args.process}.bz2"
save_log(fn, outputs)

View File

@ -7,7 +7,7 @@ import struct
import subprocess
import time
import os
from typing import Generator
from typing import Dict, Generator, Union
SPARSE_CHUNK_FMT = struct.Struct('H2xI4x')
@ -99,26 +99,30 @@ def get_partition_path(target_slot_number: int, partition: dict) -> str:
return path
def verify_partition(target_slot_number: int, partition: dict) -> bool:
def verify_partition(target_slot_number: int, partition: Dict[str, Union[str, int]]) -> bool:
full_check = partition['full_check']
path = get_partition_path(target_slot_number, partition)
partition_size = partition['size']
if not isinstance(partition['size'], int):
return False
partition_size: int = partition['size']
if not isinstance(partition['hash_raw'], str):
return False
partition_hash: str = partition['hash_raw']
with open(path, 'rb+') as out:
if full_check:
raw_hash = hashlib.sha256()
pos = 0
chunk_size = 1024 * 1024
pos, chunk_size = 0, 1024 * 1024
while pos < partition_size:
n = min(chunk_size, partition_size - pos)
raw_hash.update(out.read(n))
pos += n
return raw_hash.hexdigest().lower() == partition['hash_raw'].lower()
return raw_hash.hexdigest().lower() == partition_hash.lower()
else:
out.seek(partition_size)
return out.read(64) == partition['hash_raw'].lower().encode()
return out.read(64) == partition_hash.lower().encode()
def clear_partition_hash(target_slot_number: int, partition: dict) -> None:

View File

@ -48,7 +48,7 @@ class Calibration:
def is_calibration_valid(rpy: np.ndarray) -> bool:
return (PITCH_LIMITS[0] < rpy[1] < PITCH_LIMITS[1]) and (YAW_LIMITS[0] < rpy[2] < YAW_LIMITS[1])
return (PITCH_LIMITS[0] < rpy[1] < PITCH_LIMITS[1]) and (YAW_LIMITS[0] < rpy[2] < YAW_LIMITS[1]) # type: ignore
def sanity_clip(rpy: np.ndarray) -> np.ndarray:

View File

@ -8,10 +8,10 @@ from selfdrive.manager.process import PythonProcess, NativeProcess, DaemonProces
WEBCAM = os.getenv("USE_WEBCAM") is not None
def driverview(started: bool, params: Params, CP: car.CarParams) -> bool:
return params.get_bool("IsDriverViewEnabled")
return params.get_bool("IsDriverViewEnabled") # type: ignore
def notcar(started: bool, params: Params, CP: car.CarParams) -> bool:
return CP.notCar
return CP.notCar # type: ignore
def logging(started, params, CP: car.CarParams) -> bool:
run = (not CP.notCar) or not params.get_bool("DisableLogging")

View File

@ -31,7 +31,7 @@ if __name__ == "__main__":
tm = m
if tm is None:
continue
if not i.measurementStatus.measurementNotUsable and i.measurementStatus.satelliteTimeIsKnown:
sat_time = (i.unfilteredMeasurementIntegral + i.unfilteredMeasurementFraction + i.latency) / 1000
ublox_psuedorange = tm.pseudorange
@ -56,7 +56,7 @@ if __name__ == "__main__":
pr_err /= len(car)
speed_err /= len(car)
print("avg psuedorange err %f avg speed err %f" % (pr_err, speed_err))
for c in sorted(car, key=lambda x: abs(x[1] - x[3] - pr_err)):
for c in sorted(car, key=lambda x: abs(x[1] - x[3] - pr_err)): # type: ignore
svid, ublox_psuedorange, ublox_speed, qcom_psuedorange, qcom_speed, cno = c
print("svid: %3d pseudorange: %10.2f m speed: %8.2f m/s meas: %12.2f speed: %10.2f meas_err: %10.3f speed_err: %8.3f cno: %d" %
(svid, ublox_psuedorange, ublox_speed, qcom_psuedorange, qcom_speed,

View File

@ -311,7 +311,7 @@ def fetch_update(wait_helper: WaitTimeHelper) -> bool:
cur_hash = run(["git", "rev-parse", "HEAD"], OVERLAY_MERGED).rstrip()
upstream_hash = run(["git", "rev-parse", "@{u}"], OVERLAY_MERGED).rstrip()
new_version = cur_hash != upstream_hash
new_version: bool = cur_hash != upstream_hash
git_fetch_result = check_git_fetch_result(git_fetch_output)
cloudlog.info(f"comparing {cur_hash} to {upstream_hash}")

View File

@ -55,7 +55,7 @@ def get_origin(default: Optional[str] = None) -> Optional[str]:
@cache
def get_normalized_origin(default: Optional[str] = None) -> Optional[str]:
origin = get_origin()
origin: Optional[str] = get_origin()
if origin is None:
return default
@ -74,7 +74,7 @@ def get_version() -> str:
@cache
def get_short_version() -> str:
return get_version().split('-')[0]
return get_version().split('-')[0] # type: ignore
@cache
def is_prebuilt() -> bool:
@ -85,7 +85,7 @@ def is_prebuilt() -> bool:
def is_comma_remote() -> bool:
# note to fork maintainers, this is used for release metrics. please do not
# touch this to get rid of the orange startup alert. there's better ways to do that
origin = get_origin()
origin: Optional[str] = get_origin()
if origin is None:
return False

View File

@ -36,9 +36,13 @@ class Bootlog:
return timestamp_to_datetime(self._timestamp)
def __eq__(self, b) -> bool:
if not isinstance(b, Bootlog):
return False
return self.datetime == b.datetime
def __lt__(self, b) -> bool:
if not isinstance(b, Bootlog):
return False
return self.datetime < b.datetime