mypy: use implicit-optional (#31590)

* mypy: set implicit-optional = true

* find and replace '| None = None' -> '= None' in function args
old-commit-hash: 80da3aee14
This commit is contained in:
Cameron Clough 2024-02-25 21:29:18 +00:00 committed by GitHub
parent b04a778fcb
commit a09300385b
25 changed files with 33 additions and 30 deletions

View File

@ -23,7 +23,7 @@ class CallbackReader:
@contextlib.contextmanager
def atomic_write_in_dir(path: str, mode: str = 'w', buffering: int = -1, encoding: str | None = None, newline: str | None = None,
def atomic_write_in_dir(path: str, mode: str = 'w', buffering: int = -1, encoding: str = None, newline: str = None,
overwrite: bool = False):
"""Write to a file atomically using a temporary file in the same directory as the destination file."""
dir_name = os.path.dirname(path)

View File

@ -8,7 +8,7 @@ from openpilot.system.hardware.hw import Paths
from openpilot.system.hardware.hw import DEFAULT_DOWNLOAD_CACHE_ROOT
class OpenpilotPrefix:
def __init__(self, prefix: str | None = None, clean_dirs_on_exit: bool = True, shared_download_cache: bool = False):
def __init__(self, prefix: str = None, clean_dirs_on_exit: bool = True, shared_download_cache: bool = False):
self.prefix = prefix if prefix else str(uuid.uuid4().hex[0:15])
self.msgq_path = os.path.join('/dev/shm', self.prefix)
self.clean_dirs_on_exit = clean_dirs_on_exit

View File

@ -63,6 +63,9 @@ warn_unused_ignores=true
# restrict dynamic typing
warn_return_any=true
# allow implicit optionals for default args
implicit_optional = true
[tool.poetry]
name = "openpilot"

View File

@ -279,7 +279,7 @@ def upload_handler(end_event: threading.Event) -> None:
cloudlog.exception("athena.upload_handler.exception")
def _do_upload(upload_item: UploadItem, callback: Callable | None = None) -> requests.Response:
def _do_upload(upload_item: UploadItem, callback: Callable = None) -> requests.Response:
path = upload_item.path
compress = False
@ -328,7 +328,7 @@ def getVersion() -> dict[str, str]:
@dispatcher.add_method
def setNavDestination(latitude: int = 0, longitude: int = 0, place_name: str | None = None, place_details: str | None = None) -> dict[str, int]:
def setNavDestination(latitude: int = 0, longitude: int = 0, place_name: str = None, place_details: str = None) -> dict[str, int]:
destination = {
"latitude": latitude,
"longitude": longitude,
@ -767,7 +767,7 @@ def backoff(retries: int) -> int:
return random.randrange(0, min(128, int(2 ** retries)))
def main(exit_event: threading.Event | None = None):
def main(exit_event: threading.Event = None):
try:
set_core_affinity([0, 1, 2, 3])
except Exception:

View File

@ -96,7 +96,7 @@ class TestAthenadMethods(unittest.TestCase):
break
@staticmethod
def _create_file(file: str, parent: str | None = None, data: bytes = b'') -> str:
def _create_file(file: str, parent: str = None, data: bytes = b'') -> str:
fn = os.path.join(Paths.log_root() if parent is None else parent, file)
os.makedirs(os.path.dirname(fn), exist_ok=True)
with open(fn, 'wb') as f:

View File

@ -159,7 +159,7 @@ class CarParts:
return copy.deepcopy(self)
@classmethod
def common(cls, add: list[EnumBase] | None = None, remove: list[EnumBase] | None = None):
def common(cls, add: list[EnumBase] = None, remove: list[EnumBase] = None):
p = [part for part in (add or []) + DEFAULT_CAR_PARTS if part not in (remove or [])]
return cls(p)

View File

@ -19,7 +19,7 @@ def make_tester_present_msg(addr, bus, subaddr=None):
return make_can_msg(addr, bytes(dat), bus)
def is_tester_present_response(msg: capnp.lib.capnp._DynamicStructReader, subaddr: int | None = None) -> bool:
def is_tester_present_response(msg: capnp.lib.capnp._DynamicStructReader, subaddr: int = None) -> bool:
# ISO-TP messages are always padded to 8 bytes
# tester present response is always a single frame
dat_offset = 1 if subaddr is not None else 0

View File

@ -39,7 +39,7 @@ def is_brand(brand: str, filter_brand: str | None) -> bool:
def build_fw_dict(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder],
filter_brand: str | None = None) -> dict[AddrType, set[bytes]]:
filter_brand: str = None) -> dict[AddrType, set[bytes]]:
fw_versions_dict: defaultdict[AddrType, set[bytes]] = defaultdict(set)
for fw in fw_versions:
if is_brand(fw.brand, filter_brand) and not fw.logging:

View File

@ -12,7 +12,7 @@ from panda.python.uds import CanClient, IsoTpMessage, FUNCTIONAL_ADDRS, get_rx_a
class IsoTpParallelQuery:
def __init__(self, sendcan: messaging.PubSocket, logcan: messaging.SubSocket, bus: int, addrs: list[int] | list[AddrType],
request: list[bytes], response: list[bytes], response_offset: int = 0x8,
functional_addrs: list[int] | None = None, debug: bool = False, response_pending_timeout: float = 10) -> None:
functional_addrs: list[int] = None, debug: bool = False, response_pending_timeout: float = 10) -> None:
self.sendcan = sendcan
self.logcan = logcan
self.bus = bus

View File

@ -13,7 +13,7 @@ with open(os.path.join(BASEDIR, "selfdrive/controls/lib/alerts_offroad.json")) a
OFFROAD_ALERTS = json.load(f)
def set_offroad_alert(alert: str, show_alert: bool, extra_text: str | None = None) -> None:
def set_offroad_alert(alert: str, show_alert: bool, extra_text: str = None) -> None:
if show_alert:
a = copy.copy(OFFROAD_ALERTS[alert])
a['extra'] = extra_text or ''

View File

@ -89,7 +89,7 @@ class Calibrator:
valid_blocks: int = 0,
wide_from_device_euler_init: np.ndarray = WIDE_FROM_DEVICE_EULER_INIT,
height_init: np.ndarray = HEIGHT_INIT,
smooth_from: np.ndarray | None = None) -> None:
smooth_from: np.ndarray = None) -> None:
if not np.isfinite(rpy_init).all():
self.rpy = RPY_INIT.copy()
else:

View File

@ -41,7 +41,7 @@ class PointBuckets:
def add_point(self, x: float, y: float, bucket_val: float) -> None:
raise NotImplementedError
def get_points(self, num_points: int | None = None) -> Any:
def get_points(self, num_points: int = None) -> Any:
points = np.vstack([x.arr for x in self.buckets.values()])
if num_points is None:
return points

View File

@ -109,7 +109,7 @@ class ManagerProcess(ABC):
else:
self.watchdog_seen = True
def stop(self, retry: bool = True, block: bool = True, sig: signal.Signals | None = None) -> int | None:
def stop(self, retry: bool = True, block: bool = True, sig: signal.Signals = None) -> int | None:
if self.proc is None:
return None

View File

@ -106,7 +106,7 @@ def distance_along_geometry(geometry: list[Coordinate], pos: Coordinate) -> floa
return total_distance_closest
def coordinate_from_param(param: str, params: Params | None = None) -> Coordinate | None:
def coordinate_from_param(param: str, params: Params = None) -> Coordinate | None:
if params is None:
params = Params()

View File

@ -68,7 +68,7 @@ class FuzzyGenerator:
else:
return self.generate_struct(field.schema)
def generate_struct(self, schema: capnp.lib.capnp._StructSchema, event: str | None = None) -> st.SearchStrategy[dict[str, Any]]:
def generate_struct(self, schema: capnp.lib.capnp._StructSchema, event: str = None) -> st.SearchStrategy[dict[str, Any]]:
full_fill: list[str] = list(schema.non_union_fields)
single_fill: list[str] = [event] if event else [self.draw(st.sampled_from(schema.union_fields))] if schema.union_fields else []
return st.fixed_dictionaries({field: self.generate_field(schema.fields[field]) for field in full_fill + single_fill})

View File

@ -627,9 +627,9 @@ def replay_process_with_name(name: str | Iterable[str], lr: LogIterable, *args,
def replay_process(
cfg: ProcessConfig | Iterable[ProcessConfig], lr: LogIterable, frs: dict[str, BaseFrameReader] | None = None,
fingerprint: str | None = None, return_all_logs: bool = False, custom_params: dict[str, Any] | None = None,
captured_output_store: dict[str, dict[str, str]] | None = None, disable_progress: bool = False
cfg: ProcessConfig | Iterable[ProcessConfig], lr: LogIterable, frs: dict[str, BaseFrameReader] = None,
fingerprint: str = None, return_all_logs: bool = False, custom_params: dict[str, Any] = None,
captured_output_store: dict[str, dict[str, str]] = None, disable_progress: bool = False
) -> list[capnp._DynamicStructReader]:
if isinstance(cfg, Iterable):
cfgs = list(cfg)

View File

@ -41,7 +41,7 @@ class DummyFrameReader(BaseFrameReader):
def regen_segment(
lr: LogIterable, frs: dict[str, Any] | None = None,
lr: LogIterable, frs: dict[str, Any] = None,
processes: Iterable[ProcessConfig] = CONFIGS, disable_tqdm: bool = False
) -> list[capnp._DynamicStructReader]:
all_msgs = sorted(lr, key=lambda m: m.logMonoTime)

View File

@ -19,7 +19,7 @@ SOURCES: list[AzureContainer] = [
DEST = OpenpilotCIContainer
def upload_route(path: str, exclude_patterns: Iterable[str] | None = None) -> None:
def upload_route(path: str, exclude_patterns: Iterable[str] = None) -> None:
if exclude_patterns is None:
exclude_patterns = [r'dcamera\.hevc']

View File

@ -18,7 +18,7 @@ OPENAI_PROMPT = "You are a professional translator from English to {language} (I
"The following sentence or word is in the GUI of a software called openpilot, translate it accordingly."
def get_language_files(languages: list[str] | None = None) -> dict[str, pathlib.Path]:
def get_language_files(languages: list[str] = None) -> dict[str, pathlib.Path]:
files = {}
with open(TRANSLATIONS_LANGUAGES) as fp:

View File

@ -71,7 +71,7 @@ def read_time_from_param(params, param) -> datetime.datetime | None:
pass
return None
def run(cmd: list[str], cwd: str | None = None) -> str:
def run(cmd: list[str], cwd: str = None) -> str:
return subprocess.check_output(cmd, cwd=cwd, stderr=subprocess.STDOUT, encoding='utf8')

View File

@ -145,7 +145,7 @@ def build_chunk_dict(chunks: list[Chunk]) -> ChunkDict:
def extract(target: list[Chunk],
sources: list[tuple[str, ChunkReader, ChunkDict]],
out_path: str,
progress: Callable[[int], None] | None = None):
progress: Callable[[int], None] = None):
stats: dict[str, int] = defaultdict(int)
mode = 'rb+' if os.path.exists(out_path) else 'wb'

View File

@ -11,7 +11,7 @@ from openpilot.system.hardware.hw import Paths
from openpilot.system.loggerd.xattr_cache import setxattr
def create_random_file(file_path: Path, size_mb: float, lock: bool = False, upload_xattr: bytes | None = None) -> None:
def create_random_file(file_path: Path, size_mb: float, lock: bool = False, upload_xattr: bytes = None) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
if lock:
@ -81,7 +81,7 @@ class UploaderTestCase(unittest.TestCase):
self.params.put("DongleId", "0000000000000000")
def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False,
upload_xattr: bytes | None = None, preserve_xattr: bytes | None = None) -> Path:
upload_xattr: bytes = None, preserve_xattr: bytes = None) -> Path:
file_path = Path(Paths.log_root()) / f_dir / fn
create_random_file(file_path, size_mb, lock, upload_xattr)

View File

@ -52,7 +52,7 @@ class TestUploader(UploaderTestCase):
self.end_event.set()
self.up_thread.join()
def gen_files(self, lock=False, xattr: bytes | None = None, boot=True) -> list[Path]:
def gen_files(self, lock=False, xattr: bytes = None, boot=True) -> list[Path]:
f_paths = []
for t in ["qlog", "rlog", "dcamera.hevc", "fcamera.hevc"]:
f_paths.append(self.make_file_with_data(self.seg_dir, t, 1, lock=lock, upload_xattr=xattr))

View File

@ -222,7 +222,7 @@ class Uploader:
return self.upload(name, key, fn, network_type, metered)
def main(exit_event: threading.Event | None = None) -> None:
def main(exit_event: threading.Event = None) -> None:
if exit_event is None:
exit_event = threading.Event()

View File

@ -16,7 +16,7 @@ class AudioInputStreamTrack(aiortc.mediastreams.AudioStreamTrack):
pyaudio.paFloat32: 'flt',
}
def __init__(self, audio_format: int = pyaudio.paInt16, rate: int = 16000, channels: int = 1, packet_time: float = 0.020, device_index: int | None = None):
def __init__(self, audio_format: int = pyaudio.paInt16, rate: int = 16000, channels: int = 1, packet_time: float = 0.020, device_index: int = None):
super().__init__()
self.p = pyaudio.PyAudio()
@ -48,7 +48,7 @@ class AudioInputStreamTrack(aiortc.mediastreams.AudioStreamTrack):
class AudioOutputSpeaker:
def __init__(self, audio_format: int = pyaudio.paInt16, rate: int = 48000, channels: int = 2, packet_time: float = 0.2, device_index: int | None = None):
def __init__(self, audio_format: int = pyaudio.paInt16, rate: int = 48000, channels: int = 2, packet_time: float = 0.2, device_index: int = None):
chunk_size = int(packet_time * rate)
self.p = pyaudio.PyAudio()