This commit is contained in:
Greg Hogan 2019-10-13 20:43:55 -07:00
parent 768fdf7e19
commit b64d6fa5d2
1 changed files with 35 additions and 28 deletions

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3
import time
import struct
from typing import NamedTuple, List
from enum import IntEnum
from queue import Queue, Empty
from threading import Thread
@ -141,6 +142,12 @@ class DYNAMIC_DEFINITION_TYPE(IntEnum):
DEFINE_BY_MEMORY_ADDRESS = 2
CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER = 3
class DynamicSourceDefinition(NamedTuple):
data_identifier: int
position: int
memory_size: int
memory_address: int
class DTC_GROUP_TYPE(IntEnum):
EMISSIONS = 0x000000
ALL = 0xFFFFFF
@ -185,7 +192,7 @@ class DTC_SEVERITY_MASK_TYPE(IntEnum):
CHECK_IMMEDIATELY = 0x80
ALL = 0xE0
class CONTROL_OPTION_TYPE(IntEnum):
class CONTROL_PARAMETER_TYPE(IntEnum):
RETURN_CONTROL_TO_ECU = 0
RESET_TO_DEFAULT = 1
FREEZE_CURRENT_STATE = 2
@ -261,7 +268,7 @@ _negative_response_codes = {
}
class UdsClient():
def __init__(self, panda, tx_addr, rx_addr=None, bus=0, timeout=10, debug=False):
def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: int=10, debug: bool=False):
self.panda = panda
self.bus = bus
self.tx_addr = tx_addr
@ -284,7 +291,7 @@ class UdsClient():
self.can_reader_t.daemon = True
self.can_reader_t.start()
def _isotp_thread(self, debug):
def _isotp_thread(self, debug: bool=False):
try:
rx_frame = {"size": 0, "data": b"", "idx": 0, "done": True}
tx_frame = {"size": 0, "data": b"", "idx": 0, "done": True}
@ -389,7 +396,7 @@ class UdsClient():
self.rx_queue.put(None)
# generic uds request
def _uds_request(self, service_type, subfunction=None, data=None):
def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data: bytes=None) -> bytes:
req = bytes([service_type])
if subfunction is not None:
req += bytes([subfunction])
@ -441,17 +448,17 @@ class UdsClient():
return resp[(1 if subfunction is None else 2):]
# services
def diagnostic_session_control(self, session_type):
def diagnostic_session_control(self, session_type: SESSION_TYPE):
self._uds_request(SERVICE_TYPE.DIAGNOSTIC_SESSION_CONTROL, subfunction=session_type)
def ecu_reset(self, reset_type):
def ecu_reset(self, reset_type: RESET_TYPE):
resp = self._uds_request(SERVICE_TYPE.ECU_RESET, subfunction=reset_type)
power_down_time = None
if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN:
power_down_time = resp[0]
return power_down_time
def security_access(self, access_type, security_key=None):
def security_access(self, access_type: ACCESS_TYPE, security_key: bytes=None):
request_seed = access_type % 2 != 0
if request_seed and security_key is not None:
raise ValueError('security_key not allowed')
@ -462,14 +469,14 @@ class UdsClient():
security_seed = resp
return security_seed
def communication_control(self, control_type, message_type):
def communication_control(self, control_type: CONTROL_TYPE, message_type: MESSAGE_TYPE):
data = bytes([message_type])
self._uds_request(SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data)
def tester_present(self, ):
self._uds_request(SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00)
def access_timing_parameter(self, timing_parameter_type, parameter_values):
def access_timing_parameter(self, timing_parameter_type: TIMING_PARAMETER_TYPE, parameter_values: bytes=None):
write_custom_values = timing_parameter_type == TIMING_PARAMETER_TYPE.SET_TO_GIVEN_VALUES
read_values = (
timing_parameter_type == TIMING_PARAMETER_TYPE.READ_CURRENTLY_ACTIVE or
@ -485,16 +492,16 @@ class UdsClient():
parameter_values = resp
return parameter_values
def secured_data_transmission(self, data):
def secured_data_transmission(self, data: bytes):
# TODO: split data into multiple input parameters?
resp = self._uds_request(SERVICE_TYPE.SECURED_DATA_TRANSMISSION, subfunction=None, data=data)
# TODO: parse response into multiple output values?
return resp
def control_dtc_setting(self, dtc_setting_type):
def control_dtc_setting(self, dtc_setting_type: DTC_SETTING_TYPE):
self._uds_request(SERVICE_TYPE.CONTROL_DTC_SETTING, subfunction=dtc_setting_type)
def response_on_event(self, response_event_type, store_event, window_time, event_type_record, service_response_record):
def response_on_event(self, response_event_type: RESPONSE_EVENT_TYPE, store_event: bool, window_time: int, event_type_record: int, service_response_record: int):
if store_event:
response_event_type |= 0x20
# TODO: split record parameters into arrays
@ -513,7 +520,7 @@ class UdsClient():
"data": resp[2:], # TODO: parse the reset of response
}
def link_control(self, link_control_type, baud_rate_type=None):
def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: BAUD_RATE_TYPE=None):
if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE:
# baud_rate_type = BAUD_RATE_TYPE
data = bytes([baud_rate_type])
@ -524,7 +531,7 @@ class UdsClient():
data = None
self._uds_request(SERVICE_TYPE.LINK_CONTROL, subfunction=link_control_type, data=data)
def read_data_by_identifier(self, data_identifier_type):
def read_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE):
# TODO: support list of identifiers
data = struct.pack('!H', data_identifier_type)
resp = self._uds_request(SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data)
@ -533,7 +540,7 @@ class UdsClient():
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:]
def read_memory_by_address(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=1):
def read_memory_by_address(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
@ -550,7 +557,7 @@ class UdsClient():
resp = self._uds_request(SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data)
return resp
def read_scaling_data_by_identifier(self, data_identifier_type):
def read_scaling_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE):
data = struct.pack('!H', data_identifier_type)
resp = self._uds_request(SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
@ -558,12 +565,12 @@ class UdsClient():
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:] # TODO: parse the response
def read_data_by_periodic_identifier(self, transmission_mode_type, periodic_data_identifier):
def read_data_by_periodic_identifier(self, transmission_mode_type: TRANSMISSION_MODE_TYPE, periodic_data_identifier: int):
# TODO: support list of identifiers
data = bytes([transmission_mode_type, periodic_data_identifier])
self._uds_request(SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)
def dynamically_define_data_identifier(self, dynamic_definition_type, dynamic_data_identifier, source_definitions, memory_address_bytes=4, memory_size_bytes=1):
def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DEFINITION_TYPE, dynamic_data_identifier: int, source_definitions: List[DynamicSourceDefinition], memory_address_bytes: int=4, memory_size_bytes: int=1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
@ -588,14 +595,14 @@ class UdsClient():
raise ValueError('invalid dynamic identifier type: {}'.format(hex(dynamic_definition_type)))
self._uds_request(SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data)
def write_data_by_identifier(self, data_identifier_type, data_record):
def write_data_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, data_record: bytes):
data = struct.pack('!H', data_identifier_type) + data_record
resp = self._uds_request(SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
def write_memory_by_address(self, memory_address, memory_size, data_record, memory_address_bytes=4, memory_size_bytes=1):
def write_memory_by_address(self, memory_address: int, memory_size: int, data_record: bytes, memory_address_bytes: int=4, memory_size_bytes: int=1):
if memory_address_bytes < 1 or memory_address_bytes > 4:
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
@ -612,11 +619,11 @@ class UdsClient():
data += data_record
self._uds_request(SERVICE_TYPE.WRITE_MEMORY_BY_ADDRESS, subfunction=0x00, data=data)
def clear_diagnostic_information(self, dtc_group_type):
def clear_diagnostic_information(self, dtc_group_type: DTC_GROUP_TYPE):
data = struct.pack('!I', dtc_group_type)[1:] # 3 bytes
self._uds_request(SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data)
def read_dtc_information(self, dtc_report_type, dtc_status_mask_type=DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type=DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record=0xFFFFFF, dtc_snapshot_record_num=0xFF, dtc_extended_record_num=0xFF):
def read_dtc_information(self, dtc_report_type: DTC_REPORT_TYPE, dtc_status_mask_type: DTC_STATUS_MASK_TYPE=DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type: DTC_SEVERITY_MASK_TYPE=DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record: int=0xFFFFFF, dtc_snapshot_record_num: int=0xFF, dtc_extended_record_num: int=0xFF):
data = b''
# dtc_status_mask_type
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_STATUS_MASK or \
@ -652,15 +659,15 @@ class UdsClient():
# TODO: parse response
return resp
def input_output_control_by_identifier(self, data_identifier_type, control_option_record, control_enable_mask_record=b''):
data = struct.pack('!H', data_identifier_type) + control_option_record + control_enable_mask_record
def input_output_control_by_identifier(self, data_identifier_type: DATA_IDENTIFIER_TYPE, control_parameter_type: CONTROL_PARAMETER_TYPE, control_option_record: bytes, control_enable_mask_record: bytes=b''):
data = struct.pack('!H', data_identifier_type) + bytes([control_parameter_type]) + control_option_record + control_enable_mask_record
resp = self._uds_request(SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:]
def routine_control(self, routine_control_type, routine_identifier_type, routine_option_record=b''):
def routine_control(self, routine_control_type: ROUTINE_CONTROL_TYPE, routine_identifier_type: ROUTINE_IDENTIFIER_TYPE, routine_option_record: bytes=b''):
data = struct.pack('!H', routine_identifier_type) + routine_option_record
resp = self._uds_request(SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
@ -668,7 +675,7 @@ class UdsClient():
raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id)))
return resp[2:]
def request_download(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00):
def request_download(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=4, data_format: int=0x00):
data = bytes([data_format])
if memory_address_bytes < 1 or memory_address_bytes > 4:
@ -693,7 +700,7 @@ class UdsClient():
return max_num_bytes # max number of bytes per transfer data request
def request_upload(self, memory_address, memory_size, memory_address_bytes=4, memory_size_bytes=4, data_format=0x00):
def request_upload(self, memory_address: int, memory_size: int, memory_address_bytes: int=4, memory_size_bytes: int=4, data_format: int=0x00):
data = bytes([data_format])
if memory_address_bytes < 1 or memory_address_bytes > 4:
@ -718,7 +725,7 @@ class UdsClient():
return max_num_bytes # max number of bytes per transfer data request
def transfer_data(self, block_sequence_count, data=b''):
def transfer_data(self, block_sequence_count: int, data: bytes=b''):
data = bytes([block_sequence_count]) + data
resp = self._uds_request(SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
resp_id = resp[0] if len(resp) > 0 else None