mypy: use implicit-optional (#31590)
* mypy: set implicit-optional = true * find and replace '| None = None' -> '= None' in function args
This commit is contained in:
parent
3520d47955
commit
80da3aee14
|
@ -23,7 +23,7 @@ class CallbackReader:
|
|||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def atomic_write_in_dir(path: str, mode: str = 'w', buffering: int = -1, encoding: str | None = None, newline: str | None = None,
|
||||
def atomic_write_in_dir(path: str, mode: str = 'w', buffering: int = -1, encoding: str = None, newline: str = None,
|
||||
overwrite: bool = False):
|
||||
"""Write to a file atomically using a temporary file in the same directory as the destination file."""
|
||||
dir_name = os.path.dirname(path)
|
||||
|
|
|
@ -8,7 +8,7 @@ from openpilot.system.hardware.hw import Paths
|
|||
from openpilot.system.hardware.hw import DEFAULT_DOWNLOAD_CACHE_ROOT
|
||||
|
||||
class OpenpilotPrefix:
|
||||
def __init__(self, prefix: str | None = None, clean_dirs_on_exit: bool = True, shared_download_cache: bool = False):
|
||||
def __init__(self, prefix: str = None, clean_dirs_on_exit: bool = True, shared_download_cache: bool = False):
|
||||
self.prefix = prefix if prefix else str(uuid.uuid4().hex[0:15])
|
||||
self.msgq_path = os.path.join('/dev/shm', self.prefix)
|
||||
self.clean_dirs_on_exit = clean_dirs_on_exit
|
||||
|
|
|
@ -63,6 +63,9 @@ warn_unused_ignores=true
|
|||
# restrict dynamic typing
|
||||
warn_return_any=true
|
||||
|
||||
# allow implicit optionals for default args
|
||||
implicit_optional = true
|
||||
|
||||
|
||||
[tool.poetry]
|
||||
name = "openpilot"
|
||||
|
|
|
@ -279,7 +279,7 @@ def upload_handler(end_event: threading.Event) -> None:
|
|||
cloudlog.exception("athena.upload_handler.exception")
|
||||
|
||||
|
||||
def _do_upload(upload_item: UploadItem, callback: Callable | None = None) -> requests.Response:
|
||||
def _do_upload(upload_item: UploadItem, callback: Callable = None) -> requests.Response:
|
||||
path = upload_item.path
|
||||
compress = False
|
||||
|
||||
|
@ -328,7 +328,7 @@ def getVersion() -> dict[str, str]:
|
|||
|
||||
|
||||
@dispatcher.add_method
|
||||
def setNavDestination(latitude: int = 0, longitude: int = 0, place_name: str | None = None, place_details: str | None = None) -> dict[str, int]:
|
||||
def setNavDestination(latitude: int = 0, longitude: int = 0, place_name: str = None, place_details: str = None) -> dict[str, int]:
|
||||
destination = {
|
||||
"latitude": latitude,
|
||||
"longitude": longitude,
|
||||
|
@ -767,7 +767,7 @@ def backoff(retries: int) -> int:
|
|||
return random.randrange(0, min(128, int(2 ** retries)))
|
||||
|
||||
|
||||
def main(exit_event: threading.Event | None = None):
|
||||
def main(exit_event: threading.Event = None):
|
||||
try:
|
||||
set_core_affinity([0, 1, 2, 3])
|
||||
except Exception:
|
||||
|
|
|
@ -96,7 +96,7 @@ class TestAthenadMethods(unittest.TestCase):
|
|||
break
|
||||
|
||||
@staticmethod
|
||||
def _create_file(file: str, parent: str | None = None, data: bytes = b'') -> str:
|
||||
def _create_file(file: str, parent: str = None, data: bytes = b'') -> str:
|
||||
fn = os.path.join(Paths.log_root() if parent is None else parent, file)
|
||||
os.makedirs(os.path.dirname(fn), exist_ok=True)
|
||||
with open(fn, 'wb') as f:
|
||||
|
|
|
@ -159,7 +159,7 @@ class CarParts:
|
|||
return copy.deepcopy(self)
|
||||
|
||||
@classmethod
|
||||
def common(cls, add: list[EnumBase] | None = None, remove: list[EnumBase] | None = None):
|
||||
def common(cls, add: list[EnumBase] = None, remove: list[EnumBase] = None):
|
||||
p = [part for part in (add or []) + DEFAULT_CAR_PARTS if part not in (remove or [])]
|
||||
return cls(p)
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ def make_tester_present_msg(addr, bus, subaddr=None):
|
|||
return make_can_msg(addr, bytes(dat), bus)
|
||||
|
||||
|
||||
def is_tester_present_response(msg: capnp.lib.capnp._DynamicStructReader, subaddr: int | None = None) -> bool:
|
||||
def is_tester_present_response(msg: capnp.lib.capnp._DynamicStructReader, subaddr: int = None) -> bool:
|
||||
# ISO-TP messages are always padded to 8 bytes
|
||||
# tester present response is always a single frame
|
||||
dat_offset = 1 if subaddr is not None else 0
|
||||
|
|
|
@ -39,7 +39,7 @@ def is_brand(brand: str, filter_brand: str | None) -> bool:
|
|||
|
||||
|
||||
def build_fw_dict(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder],
|
||||
filter_brand: str | None = None) -> dict[AddrType, set[bytes]]:
|
||||
filter_brand: str = None) -> dict[AddrType, set[bytes]]:
|
||||
fw_versions_dict: defaultdict[AddrType, set[bytes]] = defaultdict(set)
|
||||
for fw in fw_versions:
|
||||
if is_brand(fw.brand, filter_brand) and not fw.logging:
|
||||
|
|
|
@ -12,7 +12,7 @@ from panda.python.uds import CanClient, IsoTpMessage, FUNCTIONAL_ADDRS, get_rx_a
|
|||
class IsoTpParallelQuery:
|
||||
def __init__(self, sendcan: messaging.PubSocket, logcan: messaging.SubSocket, bus: int, addrs: list[int] | list[AddrType],
|
||||
request: list[bytes], response: list[bytes], response_offset: int = 0x8,
|
||||
functional_addrs: list[int] | None = None, debug: bool = False, response_pending_timeout: float = 10) -> None:
|
||||
functional_addrs: list[int] = None, debug: bool = False, response_pending_timeout: float = 10) -> None:
|
||||
self.sendcan = sendcan
|
||||
self.logcan = logcan
|
||||
self.bus = bus
|
||||
|
|
|
@ -13,7 +13,7 @@ with open(os.path.join(BASEDIR, "selfdrive/controls/lib/alerts_offroad.json")) a
|
|||
OFFROAD_ALERTS = json.load(f)
|
||||
|
||||
|
||||
def set_offroad_alert(alert: str, show_alert: bool, extra_text: str | None = None) -> None:
|
||||
def set_offroad_alert(alert: str, show_alert: bool, extra_text: str = None) -> None:
|
||||
if show_alert:
|
||||
a = copy.copy(OFFROAD_ALERTS[alert])
|
||||
a['extra'] = extra_text or ''
|
||||
|
|
|
@ -89,7 +89,7 @@ class Calibrator:
|
|||
valid_blocks: int = 0,
|
||||
wide_from_device_euler_init: np.ndarray = WIDE_FROM_DEVICE_EULER_INIT,
|
||||
height_init: np.ndarray = HEIGHT_INIT,
|
||||
smooth_from: np.ndarray | None = None) -> None:
|
||||
smooth_from: np.ndarray = None) -> None:
|
||||
if not np.isfinite(rpy_init).all():
|
||||
self.rpy = RPY_INIT.copy()
|
||||
else:
|
||||
|
|
|
@ -41,7 +41,7 @@ class PointBuckets:
|
|||
def add_point(self, x: float, y: float, bucket_val: float) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def get_points(self, num_points: int | None = None) -> Any:
|
||||
def get_points(self, num_points: int = None) -> Any:
|
||||
points = np.vstack([x.arr for x in self.buckets.values()])
|
||||
if num_points is None:
|
||||
return points
|
||||
|
|
|
@ -109,7 +109,7 @@ class ManagerProcess(ABC):
|
|||
else:
|
||||
self.watchdog_seen = True
|
||||
|
||||
def stop(self, retry: bool = True, block: bool = True, sig: signal.Signals | None = None) -> int | None:
|
||||
def stop(self, retry: bool = True, block: bool = True, sig: signal.Signals = None) -> int | None:
|
||||
if self.proc is None:
|
||||
return None
|
||||
|
||||
|
|
|
@ -106,7 +106,7 @@ def distance_along_geometry(geometry: list[Coordinate], pos: Coordinate) -> floa
|
|||
return total_distance_closest
|
||||
|
||||
|
||||
def coordinate_from_param(param: str, params: Params | None = None) -> Coordinate | None:
|
||||
def coordinate_from_param(param: str, params: Params = None) -> Coordinate | None:
|
||||
if params is None:
|
||||
params = Params()
|
||||
|
||||
|
|
|
@ -68,7 +68,7 @@ class FuzzyGenerator:
|
|||
else:
|
||||
return self.generate_struct(field.schema)
|
||||
|
||||
def generate_struct(self, schema: capnp.lib.capnp._StructSchema, event: str | None = None) -> st.SearchStrategy[dict[str, Any]]:
|
||||
def generate_struct(self, schema: capnp.lib.capnp._StructSchema, event: str = None) -> st.SearchStrategy[dict[str, Any]]:
|
||||
full_fill: list[str] = list(schema.non_union_fields)
|
||||
single_fill: list[str] = [event] if event else [self.draw(st.sampled_from(schema.union_fields))] if schema.union_fields else []
|
||||
return st.fixed_dictionaries({field: self.generate_field(schema.fields[field]) for field in full_fill + single_fill})
|
||||
|
|
|
@ -627,9 +627,9 @@ def replay_process_with_name(name: str | Iterable[str], lr: LogIterable, *args,
|
|||
|
||||
|
||||
def replay_process(
|
||||
cfg: ProcessConfig | Iterable[ProcessConfig], lr: LogIterable, frs: dict[str, BaseFrameReader] | None = None,
|
||||
fingerprint: str | None = None, return_all_logs: bool = False, custom_params: dict[str, Any] | None = None,
|
||||
captured_output_store: dict[str, dict[str, str]] | None = None, disable_progress: bool = False
|
||||
cfg: ProcessConfig | Iterable[ProcessConfig], lr: LogIterable, frs: dict[str, BaseFrameReader] = None,
|
||||
fingerprint: str = None, return_all_logs: bool = False, custom_params: dict[str, Any] = None,
|
||||
captured_output_store: dict[str, dict[str, str]] = None, disable_progress: bool = False
|
||||
) -> list[capnp._DynamicStructReader]:
|
||||
if isinstance(cfg, Iterable):
|
||||
cfgs = list(cfg)
|
||||
|
|
|
@ -41,7 +41,7 @@ class DummyFrameReader(BaseFrameReader):
|
|||
|
||||
|
||||
def regen_segment(
|
||||
lr: LogIterable, frs: dict[str, Any] | None = None,
|
||||
lr: LogIterable, frs: dict[str, Any] = None,
|
||||
processes: Iterable[ProcessConfig] = CONFIGS, disable_tqdm: bool = False
|
||||
) -> list[capnp._DynamicStructReader]:
|
||||
all_msgs = sorted(lr, key=lambda m: m.logMonoTime)
|
||||
|
|
|
@ -19,7 +19,7 @@ SOURCES: list[AzureContainer] = [
|
|||
|
||||
DEST = OpenpilotCIContainer
|
||||
|
||||
def upload_route(path: str, exclude_patterns: Iterable[str] | None = None) -> None:
|
||||
def upload_route(path: str, exclude_patterns: Iterable[str] = None) -> None:
|
||||
if exclude_patterns is None:
|
||||
exclude_patterns = [r'dcamera\.hevc']
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ OPENAI_PROMPT = "You are a professional translator from English to {language} (I
|
|||
"The following sentence or word is in the GUI of a software called openpilot, translate it accordingly."
|
||||
|
||||
|
||||
def get_language_files(languages: list[str] | None = None) -> dict[str, pathlib.Path]:
|
||||
def get_language_files(languages: list[str] = None) -> dict[str, pathlib.Path]:
|
||||
files = {}
|
||||
|
||||
with open(TRANSLATIONS_LANGUAGES) as fp:
|
||||
|
|
|
@ -71,7 +71,7 @@ def read_time_from_param(params, param) -> datetime.datetime | None:
|
|||
pass
|
||||
return None
|
||||
|
||||
def run(cmd: list[str], cwd: str | None = None) -> str:
|
||||
def run(cmd: list[str], cwd: str = None) -> str:
|
||||
return subprocess.check_output(cmd, cwd=cwd, stderr=subprocess.STDOUT, encoding='utf8')
|
||||
|
||||
|
||||
|
|
|
@ -145,7 +145,7 @@ def build_chunk_dict(chunks: list[Chunk]) -> ChunkDict:
|
|||
def extract(target: list[Chunk],
|
||||
sources: list[tuple[str, ChunkReader, ChunkDict]],
|
||||
out_path: str,
|
||||
progress: Callable[[int], None] | None = None):
|
||||
progress: Callable[[int], None] = None):
|
||||
stats: dict[str, int] = defaultdict(int)
|
||||
|
||||
mode = 'rb+' if os.path.exists(out_path) else 'wb'
|
||||
|
|
|
@ -11,7 +11,7 @@ from openpilot.system.hardware.hw import Paths
|
|||
from openpilot.system.loggerd.xattr_cache import setxattr
|
||||
|
||||
|
||||
def create_random_file(file_path: Path, size_mb: float, lock: bool = False, upload_xattr: bytes | None = None) -> None:
|
||||
def create_random_file(file_path: Path, size_mb: float, lock: bool = False, upload_xattr: bytes = None) -> None:
|
||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if lock:
|
||||
|
@ -81,7 +81,7 @@ class UploaderTestCase(unittest.TestCase):
|
|||
self.params.put("DongleId", "0000000000000000")
|
||||
|
||||
def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False,
|
||||
upload_xattr: bytes | None = None, preserve_xattr: bytes | None = None) -> Path:
|
||||
upload_xattr: bytes = None, preserve_xattr: bytes = None) -> Path:
|
||||
file_path = Path(Paths.log_root()) / f_dir / fn
|
||||
create_random_file(file_path, size_mb, lock, upload_xattr)
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ class TestUploader(UploaderTestCase):
|
|||
self.end_event.set()
|
||||
self.up_thread.join()
|
||||
|
||||
def gen_files(self, lock=False, xattr: bytes | None = None, boot=True) -> list[Path]:
|
||||
def gen_files(self, lock=False, xattr: bytes = None, boot=True) -> list[Path]:
|
||||
f_paths = []
|
||||
for t in ["qlog", "rlog", "dcamera.hevc", "fcamera.hevc"]:
|
||||
f_paths.append(self.make_file_with_data(self.seg_dir, t, 1, lock=lock, upload_xattr=xattr))
|
||||
|
|
|
@ -222,7 +222,7 @@ class Uploader:
|
|||
return self.upload(name, key, fn, network_type, metered)
|
||||
|
||||
|
||||
def main(exit_event: threading.Event | None = None) -> None:
|
||||
def main(exit_event: threading.Event = None) -> None:
|
||||
if exit_event is None:
|
||||
exit_event = threading.Event()
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ class AudioInputStreamTrack(aiortc.mediastreams.AudioStreamTrack):
|
|||
pyaudio.paFloat32: 'flt',
|
||||
}
|
||||
|
||||
def __init__(self, audio_format: int = pyaudio.paInt16, rate: int = 16000, channels: int = 1, packet_time: float = 0.020, device_index: int | None = None):
|
||||
def __init__(self, audio_format: int = pyaudio.paInt16, rate: int = 16000, channels: int = 1, packet_time: float = 0.020, device_index: int = None):
|
||||
super().__init__()
|
||||
|
||||
self.p = pyaudio.PyAudio()
|
||||
|
@ -48,7 +48,7 @@ class AudioInputStreamTrack(aiortc.mediastreams.AudioStreamTrack):
|
|||
|
||||
|
||||
class AudioOutputSpeaker:
|
||||
def __init__(self, audio_format: int = pyaudio.paInt16, rate: int = 48000, channels: int = 2, packet_time: float = 0.2, device_index: int | None = None):
|
||||
def __init__(self, audio_format: int = pyaudio.paInt16, rate: int = 48000, channels: int = 2, packet_time: float = 0.2, device_index: int = None):
|
||||
|
||||
chunk_size = int(packet_time * rate)
|
||||
self.p = pyaudio.PyAudio()
|
||||
|
|
Loading…
Reference in New Issue