From b64d6fa5d2ed9dc0cf07994d6dcabcbb9ba4e0b4 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Sun, 13 Oct 2019 20:43:55 -0700 Subject: [PATCH] typing --- python/uds.py | 63 ++++++++++++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/python/uds.py b/python/uds.py index 438a2ab7..76bae266 100644 --- a/python/uds.py +++ b/python/uds.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import time import struct +from typing import NamedTuple, List from enum import IntEnum from queue import Queue, Empty from threading import Thread @@ -141,6 +142,12 @@ class DYNAMIC_DEFINITION_TYPE(IntEnum): DEFINE_BY_MEMORY_ADDRESS = 2 CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER = 3 +class DynamicSourceDefinition(NamedTuple): + data_identifier: int + position: int + memory_size: int + memory_address: int + class DTC_GROUP_TYPE(IntEnum): EMISSIONS = 0x000000 ALL = 0xFFFFFF @@ -185,7 +192,7 @@ class DTC_SEVERITY_MASK_TYPE(IntEnum): CHECK_IMMEDIATELY = 0x80 ALL = 0xE0 -class CONTROL_OPTION_TYPE(IntEnum): +class CONTROL_PARAMETER_TYPE(IntEnum): RETURN_CONTROL_TO_ECU = 0 RESET_TO_DEFAULT = 1 FREEZE_CURRENT_STATE = 2 @@ -261,7 +268,7 @@ _negative_response_codes = { } class UdsClient(): - def __init__(self, panda, tx_addr, rx_addr=None, bus=0, timeout=10, debug=False): + def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: int=10, debug: bool=False): self.panda = panda self.bus = bus self.tx_addr = tx_addr @@ -284,7 +291,7 @@ class UdsClient(): self.can_reader_t.daemon = True self.can_reader_t.start() - def _isotp_thread(self, debug): + def _isotp_thread(self, debug: bool=False): try: rx_frame = {"size": 0, "data": b"", "idx": 0, "done": True} tx_frame = {"size": 0, "data": b"", "idx": 0, "done": True} @@ -389,7 +396,7 @@ class UdsClient(): self.rx_queue.put(None) # generic uds request - def _uds_request(self, service_type, subfunction=None, data=None): + def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data: bytes=None) -> bytes: req = bytes([service_type]) if subfunction is not None: req += bytes([subfunction]) @@ -441,17 +448,17 @@ class UdsClient(): return resp[(1 if subfunction is None else 2):] # services - def diagnostic_session_control(self, session_type): + def diagnostic_session_control(self, session_type: SESSION_TYPE): self._uds_request(SERVICE_TYPE.DIAGNOSTIC_SESSION_CONTROL, subfunction=session_type) - def ecu_reset(self, reset_type): + def ecu_reset(self, reset_type: RESET_TYPE): resp = self._uds_request(SERVICE_TYPE.ECU_RESET, subfunction=reset_type) power_down_time = None if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN: power_down_time = resp[0] return power_down_time - def security_access(self, access_type, security_key=None): + def security_access(self, access_type: ACCESS_TYPE, security_key: bytes=None): request_seed = access_type % 2 != 0 if request_seed and security_key is not None: raise ValueError('security_key not allowed') @@ -462,14 +469,14 @@ class UdsClient(): security_seed = resp return security_seed - def communication_control(self, control_type, message_type): + def communication_control(self, control_type: CONTROL_TYPE, message_type: MESSAGE_TYPE): data = bytes([message_type]) self._uds_request(SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data) def tester_present(self, ): self._uds_request(SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00) - def access_timing_parameter(self, timing_parameter_type, parameter_values): + def access_timing_parameter(self, timing_parameter_type: TIMING_PARAMETER_TYPE, parameter_values: bytes=None): write_custom_values = timing_parameter_type == TIMING_PARAMETER_TYPE.SET_TO_GIVEN_VALUES read_values = ( timing_parameter_type == TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or @@ -485,16 +492,16 @@ class UdsClient(): parameter_values = resp return parameter_values - def secured_data_transmission(self, data): + def secured_data_transmission(self, data: bytes): # TODO: split data into multiple input parameters? resp = self._uds_request(SERVICE_TYPE.SECURED_DATA_TRANSMISSION, subfunction=None, data=data) # TODO: parse response into multiple output values? return resp - def control_dtc_setting(self, dtc_setting_type): + def control_dtc_setting(self, dtc_setting_type: DTC_SETTING_TYPE): self._uds_request(SERVICE_TYPE.CONTROL_DTC_SETTING, subfunction=dtc_setting_type) - def response_on_event(self, response_event_type, store_event, window_time, event_type_record, service_response_record): + def response_on_event(self, response_event_type: RESPONSE_EVENT_TYPE, store_event: bool, window_time: int, event_type_record: int, service_response_record: int): if store_event: response_event_type |= 0x20 # TODO: split record parameters into arrays @@ -513,7 +520,7 @@ class UdsClient(): "data": resp[2:], # TODO: parse the reset of response } - def link_control(self, link_control_type, baud_rate_type=None): + def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: BAUD_RATE_TYPE=None): if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE: # baud_rate_type = BAUD_RATE_TYPE data = bytes([baud_rate_type]) @@ -524,7 +531,7 @@ class UdsClient(): data = None self._uds_request(SERVICE_TYPE.LINK_CONTROL, subfunction=link_control_type, data=data) - def read_data_by_identifier(self, data_identifier_type): + def read_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE): # TODO: support list of identifiers data = struct.pack('!H', data_identifier_type) resp = self._uds_request(SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data) @@ -533,7 +540,7 @@ class UdsClient(): raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) return resp[2:] - def read_memory_by_address(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=1): + def read_memory_by_address(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=1): if memory_address_bytes < 1 or memory_address_bytes > 4: raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: @@ -550,7 +557,7 @@ class UdsClient(): resp = self._uds_request(SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data) return resp - def read_scaling_data_by_identifier(self, data_identifier_type): + def read_scaling_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE): data = struct.pack('!H', data_identifier_type) resp = self._uds_request(SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None @@ -558,12 +565,12 @@ class UdsClient(): raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) return resp[2:] # TODO: parse the response - def read_data_by_periodic_identifier(self, transmission_mode_type, periodic_data_identifier): + def read_data_by_periodic_identifier(self, transmission_mode_type: TRANSMISSION_MODE_TYPE, periodic_data_identifier: int): # TODO: support list of identifiers data = bytes([transmission_mode_type, periodic_data_identifier]) self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data) - def dynamically_define_data_identifier(self, dynamic_definition_type, dynamic_data_identifier, source_definitions, memory_address_bytes=4, memory_size_bytes=1): + def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int, source_definitions: List[DynamicSourceDefinition], memory_address_bytes: int=4, memory_size_bytes: int=1): if memory_address_bytes < 1 or memory_address_bytes > 4: raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: @@ -588,14 +595,14 @@ class UdsClient(): raise ValueError('invalid dynamic identifier type: {}'.format(hex(dynamic_definition_type))) self._uds_request(SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data) - def write_data_by_identifier(self, data_identifier_type, data_record): + def write_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, data_record: bytes): data = struct.pack('!H', data_identifier_type) + data_record resp = self._uds_request(SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None if resp_id != data_identifier_type: raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) - def write_memory_by_address(self, memory_address, memory_size, data_record, memory_address_bytes=4, memory_size_bytes=1): + def write_memory_by_address(self, memory_address: int, memory_size: int, data_record: bytes, memory_address_bytes: int=4, memory_size_bytes: int=1): if memory_address_bytes < 1 or memory_address_bytes > 4: raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes)) if memory_size_bytes < 1 or memory_size_bytes > 4: @@ -612,11 +619,11 @@ class UdsClient(): data += data_record self._uds_request(SERVICE_TYPE.WRITE_MEMORY_BY_ADDRESS, subfunction=0x00, data=data) - def clear_diagnostic_information(self, dtc_group_type): + def clear_diagnostic_information(self, dtc_group_type: DTC_GROUP_TYPE): data = struct.pack('!I', dtc_group_type)[1:] # 3 bytes self._uds_request(SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data) - def read_dtc_information(self, dtc_report_type, dtc_status_mask_type=DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type=DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record=0xFFFFFF, dtc_snapshot_record_num=0xFF, dtc_extended_record_num=0xFF): + def read_dtc_information(self, dtc_report_type: DTC_REPORT_TYPE, dtc_status_mask_type: DTC_STATUS_MASK_TYPE=DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type: DTC_SEVERITY_MASK_TYPE=DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record: int=0xFFFFFF, dtc_snapshot_record_num: int=0xFF, dtc_extended_record_num: int=0xFF): data = b'' # dtc_status_mask_type if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_STATUS_MASK or \ @@ -652,15 +659,15 @@ class UdsClient(): # TODO: parse response return resp - def input_output_control_by_identifier(self, data_identifier_type, control_option_record, control_enable_mask_record=b''): - data = struct.pack('!H', data_identifier_type) + control_option_record + control_enable_mask_record + def input_output_control_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, control_parameter_type: CONTROL_PARAMETER_TYPE, control_option_record: bytes, control_enable_mask_record: bytes=b''): + data = struct.pack('!H', data_identifier_type) + bytes([control_parameter_type]) + control_option_record + control_enable_mask_record resp = self._uds_request(SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None if resp_id != data_identifier_type: raise ValueError('invalid response data identifier: {}'.format(hex(resp_id))) return resp[2:] - def routine_control(self, routine_control_type, routine_identifier_type, routine_option_record=b''): + def routine_control(self, routine_control_type: ROUTINE_CONTROL_TYPE, routine_identifier_type: ROUTINE_IDENTIFIER_TYPE, routine_option_record: bytes=b''): data = struct.pack('!H', routine_identifier_type) + routine_option_record resp = self._uds_request(SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data) resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None @@ -668,7 +675,7 @@ class UdsClient(): raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id))) return resp[2:] - def request_download(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00): + def request_download(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=4, data_format: int=0x00): data = bytes([data_format]) if memory_address_bytes < 1 or memory_address_bytes > 4: @@ -693,7 +700,7 @@ class UdsClient(): return max_num_bytes # max number of bytes per transfer data request - def request_upload(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00): + def request_upload(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=4, data_format: int=0x00): data = bytes([data_format]) if memory_address_bytes < 1 or memory_address_bytes > 4: @@ -718,7 +725,7 @@ class UdsClient(): return max_num_bytes # max number of bytes per transfer data request - def transfer_data(self, block_sequence_count, data=b''): + def transfer_data(self, block_sequence_count: int, data: bytes=b''): data = bytes([block_sequence_count]) + data resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data) resp_id = resp[0] if len(resp) > 0 else None