syntax errors

This commit is contained in:
Greg Hogan
2018-11-25 12:53:56 -08:00
parent 95be4811ef
commit dca176e717

View File

@@ -1,10 +1,10 @@
#!/usr/bin/env python
import time
import struct
from enum import Enum
from enum import IntEnum
import threading
class SERVICE_TYPE(Enum):
class SERVICE_TYPE(IntEnum):
DIAGNOSTIC_SESSION_CONTROL = 0x10
ECU_RESET = 0x11
SECURITY_ACCESS = 0x27
@@ -83,41 +83,52 @@ class InvalidSubFunctioneError(Exception):
pass
# generic uds request
def _request(address, service_type, subfunction, data=None):
def _request(address, service_type, subfunction=None, data=None):
# TODO: send request
# TODO: wait for response
# raise exception on error
if resp[0] == '\x7f'
error_code = resp[2]
error_desc = _negative_response_codes[error_code]
raise NegativeResponseError('[{}] {}'.format(hex(ord(error_code)), error_desc))
#resp = '\x7f'+chr(service_type)+'\x10'
resp = '\x62\xf1\x90' + '\x57\x30\x4c\x30\x30\x30\x30\x34\x33\x4d\x42\x35\x34\x31\x33\x32\x36'
resp_sid = ord(resp[0]) if len(resp) > 0 else None
if service != resp_sid + 0x40:
# negative response
if resp_sid == 0x7F:
try:
error_service = SERVICE_TYPE(ord(resp[1])).name
except:
error_service = 'NON_STANDARD_SERVICE'
error_code = hex(ord(resp[2])) if len(resp) > 2 else '0x??'
try:
error_desc = _negative_response_codes[resp[2]]
except:
error_desc = 'unknown error'
raise NegativeResponseError('{} - {} - {}'.format(error_service, error_code, error_desc))
# positive response
if service_type+0x40 != resp_sid:
resp_sid_hex = hex(resp_sid) if resp_sid is not None else None
raise InvalidServiceIdError('invalid response service id: {}'.format(resp_sid_hex))
if subfunction is None:
resp_subf = ord(resp[1]) if len(resp) > 1 else None
if subfunction != resp_subf:
resp_subf_hex = hex(resp_subf) if resp_subf is not None else None
raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_subf)))
if subfunction is not None:
resp_sfn = ord(resp[1]) if len(resp) > 1 else None
if subfunction != resp_sfn:
resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None
raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_sfn)))
# return data (exclude service id and sub-function id)
return resp[(1 if subfunction is None else 2):]
# services
class SESSION_TYPE(Enum):
class SESSION_TYPE(IntEnum):
DEFAULT = 1
PROGRAMMING = 2
EXTENDED_DIAGNOSTIC = 3
SAFETY_SYSTEM_DIAGNOSTIC = 4
def diagnostic_session_control(address, session_type):
_request(address, service=SERVICE_TYPE.DIAGNOSTIC_SESSION_CONTROL, subfunction=session_type)
_request(address, SERVICE_TYPE.DIAGNOSTIC_SESSION_CONTROL, subfunction=session_type)
class RESET_TYPE(Enum):
class RESET_TYPE(IntEnum):
HARD = 1
KEY_OFF_ON = 2
SOFT = 3
@@ -127,11 +138,11 @@ class RESET_TYPE(Enum):
def ecu_reset(address, reset_type):
resp = _request(address, SERVICE_TYPE.ECU_RESET, subfunction=reset_type)
power_down_time = None
if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN
if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN:
power_down_time = ord(resp[0])
return power_down_time
class ACCESS_TYPE(Enum):
class ACCESS_TYPE(IntEnum):
REQUEST_SEED = 1
SEND_KEY = 2
@@ -141,30 +152,30 @@ def security_access(address, access_type, security_key=None):
raise ValueError('security_key not allowed')
if not request_seed and security_key is None:
raise ValueError('security_key is missing')
resp = _request(address, service=SERVICE_TYPE.SECURITY_ACCESS, subfunction=access_type, data=security_key)
resp = _request(address, SERVICE_TYPE.SECURITY_ACCESS, subfunction=access_type, data=security_key)
if request_seed:
security_seed = resp
return security_seed
class CONTROL_TYPE(Enum):
class CONTROL_TYPE(IntEnum):
ENABLE_RX_ENABLE_TX = 0
ENABLE_RX_DISABLE_TX = 1
DISABLE_RX_ENABLE_TX = 2
DISABLE_RX_DISABLE_TX = 3
class MESSAGE_TYPE(Enum):
class MESSAGE_TYPE(IntEnum):
NORMAL = 1
NETWORK_MANAGEMENT = 2
NORMAL_AND_NETWORK_MANAGEMENT = 3
def communication_control(address, control_type, message_type):
data = chr(message_type)
_request(address, service=SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data)
_request(address, SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data)
def tester_present(address):
_request(address, service=SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00)
_request(address, SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00)
class TIMING_PARAMETER_TYPE(Enum):
class TIMING_PARAMETER_TYPE(IntEnum):
READ_EXTENDED_SET = 1
SET_TO_DEFAULT_VALUES = 2
READ_CURRENTLY_ACTIVE = 3
@@ -180,7 +191,7 @@ def access_timing_parameter(address, timing_parameter_type, parameter_values):
raise ValueError('parameter_values not allowed')
if write_custom_values and parameter_values is None:
raise ValueError('parameter_values is missing')
resp = _request(address, service=SERVICE_TYPE.ACCESS_TIMING_PARAMETER, subfunction=timing_parameter_type, data=parameter_values)
resp = _request(address, SERVICE_TYPE.ACCESS_TIMING_PARAMETER, subfunction=timing_parameter_type, data=parameter_values)
if read_values:
# TODO: parse response into values?
parameter_values = resp
@@ -188,18 +199,18 @@ def access_timing_parameter(address, timing_parameter_type, parameter_values):
def secured_data_transmission(address, data):
# TODO: split data into multiple input parameters?
resp = _request(address, service=SERVICE_TYPE.SECURED_DATA_TRANSMISSION, subfunction=None, data=data)
resp = _request(address, SERVICE_TYPE.SECURED_DATA_TRANSMISSION, subfunction=None, data=data)
# TODO: parse response into multiple output values?
return resp
class DTC_SETTING_TYPE(Enum):
class DTC_SETTING_TYPE(IntEnum):
ON = 1
OFF = 2
def control_dtc_setting(address, dtc_setting_type):
_request(address, service=SERVICE_TYPE.CONTROL_DTC_SETTING, subfunction=dtc_setting_type)
_request(address, SERVICE_TYPE.CONTROL_DTC_SETTING, subfunction=dtc_setting_type)
class RESPONSE_EVENT_TYPE(Enum):
class RESPONSE_EVENT_TYPE(IntEnum):
STOP_RESPONSE_ON_EVENT = 0
ON_DTC_STATUS_CHANGE = 1
ON_TIMER_INTERRUPT = 2
@@ -214,7 +225,7 @@ def response_on_event(address, response_event_type, store_event, window_time, ev
response_event_type |= 0x20
# TODO: split record parameters into arrays
data = char(window_time) + event_type_record + service_response_record
resp = _request(address, service=SERVICE_TYPE.RESPONSE_ON_EVENT, subfunction=response_event_type, data=data)
resp = _request(address, SERVICE_TYPE.RESPONSE_ON_EVENT, subfunction=response_event_type, data=data)
if response_event_type == REPORT_ACTIVATED_EVENTS:
return {
@@ -228,12 +239,12 @@ def response_on_event(address, response_event_type, store_event, window_time, ev
"data": resp[2:], # TODO: parse the reset of response
}
class LINK_CONTROL_TYPE(Enum):
class LINK_CONTROL_TYPE(IntEnum):
VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE = 1
VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE = 2
TRANSITION_BAUDRATE = 3
class BAUD_RATE_TYPE(Enum):
class BAUD_RATE_TYPE(IntEnum):
PC9600 = 1
PC19200 = 2
PC38400 = 3
@@ -253,9 +264,9 @@ def link_control(address, link_control_type, baud_rate_type=None):
data = struct.pack('!I', baud_rate_type)[1:]
else:
data = None
_request(address, service=SERVICE_TYPE.LINK_CONTROL, subfunction=link_control_type, data=data)
_request(address, SERVICE_TYPE.LINK_CONTROL, subfunction=link_control_type, data=data)
class DATA_IDENTIFIER_TYPE(Enum):
class DATA_IDENTIFIER_TYPE(IntEnum):
BOOT_SOFTWARE_IDENTIFICATION = 0XF180
APPLICATION_SOFTWARE_IDENTIFICATION = 0XF181
APPLICATION_DATA_IDENTIFICATION = 0XF182
@@ -291,7 +302,7 @@ class DATA_IDENTIFIER_TYPE(Enum):
def read_data_by_identifier(address, data_identifier_type):
# TODO: support list of identifiers
data = struct.pack('!H', data_identifier_type)
resp = _request(address, service=SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp = _request(address, SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
@@ -304,25 +315,25 @@ def read_memory_by_address(address, memory_address, memory_size, memory_address_
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = struct.pack('!BB', memory_size_bytes, memory_address_bytes)
if memory_address >= 1<<(memory_address_bytes*8)
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4-memory_address_bytes:]
if memory_size >= 1<<(memory_size_bytes*8)
if memory_size >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = _request(address, service=SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data)
resp = _request(address, SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data)
return resp
def read_scaling_data_by_identifier(address, data_identifier_type):
data = struct.pack('!H', data_identifier_type)
resp = _request(address, service=SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp = _request(address, SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:] # TODO: parse the response
class TRANSMISSION_MODE_TYPE(Enum):
class TRANSMISSION_MODE_TYPE(IntEnum):
SEND_AT_SLOW_RATE = 1
SEND_AT_MEDIUM_RATE = 2
SEND_AT_FAST_RATE = 3
@@ -331,9 +342,9 @@ class TRANSMISSION_MODE_TYPE(Enum):
def read_data_by_periodic_identifier(address, transmission_mode_type, periodic_data_identifier):
# TODO: support list of identifiers
data = chr(transmission_mode_type) + chr(periodic_data_identifier)
_request(address, service=SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)
_request(address, SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)
class DYNAMIC_DEFINITION_TYPE(Enum):
class DYNAMIC_DEFINITION_TYPE(IntEnum):
DEFINE_BY_IDENTIFIER = 1
DEFINE_BY_MEMORY_ADDRESS = 2
CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER = 3
@@ -352,21 +363,21 @@ def dynamically_define_data_identifier(address, dynamic_definition_type, dynamic
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS:
data += struct.pack('!BB', memory_size_bytes, memory_address_bytes)
for s in source_definitions:
if s["memory_address"] >= 1<<(memory_address_bytes*8)
if s["memory_address"] >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(s["memory_address"]))
data += struct.pack('!I', memory_address)[4-memory_address_bytes:]
if s["memory_size"] >= 1<<(memory_size_bytes*8)
if s["memory_size"] >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(s["memory_size"]))
data += struct.pack('!I', s["memory_size"])[4-memory_size_bytes:]
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER:
pass
else:
raise ValueError('invalid dynamic identifier type: {}'.format(hex(dynamic_definition_type)))
_request(address, service=SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data)
_request(address, SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data)
def write_data_by_identifier(address, data_identifier_type, data_record):
data = struct.pack('!H', data_identifier_type) + data_record
resp = _request(address, service=SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp = _request(address, SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
@@ -378,25 +389,25 @@ def write_memory_by_address(address, memory_address, memory_size, data_record, m
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = struct.pack('!BB', memory_size_bytes, memory_address_bytes)
if memory_address >= 1<<(memory_address_bytes*8)
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4-memory_address_bytes:]
if memory_size >= 1<<(memory_size_bytes*8)
if memory_size >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
data += data_record
_request(address, service=SERVICE_TYPE.WRITE_MEMORY_BY_ADDRESS, subfunction=0x00, data=data)
_request(address, SERVICE_TYPE.WRITE_MEMORY_BY_ADDRESS, subfunction=0x00, data=data)
class DTC_GROUP_TYPE(Enum):
class DTC_GROUP_TYPE(IntEnum):
EMISSIONS = 0x000000
ALL = 0xFFFFFF
def clear_diagnostic_information(address, dtc_group_type):
data = struct.pack('!I', dtc_group_type)[1:] # 3 bytes
_request(address, service=SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data)
_request(address, SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data)
class DTC_REPORT_TYPE(Enum):
class DTC_REPORT_TYPE(IntEnum):
NUMBER_OF_DTC_BY_STATUS_MASK = 0x01
DTC_BY_STATUS_MASK = 0x02
DTC_SNAPSHOT_IDENTIFICATION = 0x03
@@ -419,7 +430,7 @@ class DTC_REPORT_TYPE(Enum):
DTC_FAULT_DETECTION_COUNTER = 0x14
DTC_WITH_PERMANENT_STATUS = 0x15
class DTC_STATUS_MASK_TYPE(Enum):
class DTC_STATUS_MASK_TYPE(IntEnum):
TEST_FAILED = 0x01
TEST_FAILED_THIS_OPERATION_CYCLE = 0x02
PENDING_DTC = 0x04
@@ -430,7 +441,7 @@ class DTC_STATUS_MASK_TYPE(Enum):
WARNING_INDICATOR_REQUESTED = 0x80
ALL = 0xFF
class DTC_SEVERITY_MASK_TYPE(Enum):
class DTC_SEVERITY_MASK_TYPE(IntEnum):
MAINTENANCE_ONLY = 0x20
CHECK_AT_NEXT_HALT = 0x40
CHECK_IMMEDIATELY = 0x80
@@ -439,40 +450,40 @@ class DTC_SEVERITY_MASK_TYPE(Enum):
def read_dtc_information(address, dtc_report_type, dtc_status_mask_type=DTC_STATUS_MASK_TYPE.ALL, dtc_severity_mask_type=DTC_SEVERITY_MASK_TYPE.ALL, dtc_mask_record=0xFFFFFF, dtc_snapshot_record_num=0xFF, dtc_extended_record_num=0xFF):
data = ''
# dtc_status_mask_type
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_STATUS_MASK or
dtc_report_type == DTC_REPORT_TYPE.DTC_BY_STATUS_MASK or
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_BY_STATUS_MASK or
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK or
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK or
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_MIRROR_MEMORY_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK or \
dtc_report_type == DTC_REPORT_TYPE.EMISSIONS_RELATED_OBD_DTC_BY_STATUS_MASK:
data += chr(dtc_status_mask_type)
# dtc_mask_record
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or
dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.SEVERITY_INFORMATION_OF_DTC:
data += struct.pack('!I', dtc_mask_record)[1:] # 3 bytes
# dtc_snapshot_record_num
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER:
data += ord(dtc_snapshot_record_num)
# dtc_extended_record_num
if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or
if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER:
data += chr(dtc_extended_record_num)
# dtc_severity_mask_type
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD or
if dtc_report_type == DTC_REPORT_TYPE.NUMBER_OF_DTC_BY_SEVERITY_MASK_RECORD or \
dtc_report_type == DTC_REPORT_TYPE.DTC_BY_SEVERITY_MASK_RECORD:
data += chr(dtc_severity_mask_type) + chr(dtc_status_mask_type)
resp = _request(address, service=SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data)
resp = _request(address, SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data)
# TODO: parse response
return resp
class CONTROL_OPTION_TYPE(Enum):
class CONTROL_OPTION_TYPE(IntEnum):
RETURN_CONTROL_TO_ECU = 0
RESET_TO_DEFAULT = 1
FREEZE_CURRENT_STATE = 2
@@ -480,25 +491,25 @@ class CONTROL_OPTION_TYPE(Enum):
def input_output_control_by_identifier(address, data_identifier_type, control_option_record, control_enable_mask_record=''):
data = struct.pack('!H', data_identifier_type) + control_option_record + control_enable_mask_record
resp = _request(address, service=SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data)
resp = _request(address, SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
return resp[2:]
class ROUTINE_CONTROL_TYPE(Enum):
class ROUTINE_CONTROL_TYPE(IntEnum):
START = 1
STOP = 2
REQUEST_RESULTS = 3
class ROUTINE_IDENTIFIER_TYPE(Enum):
class ROUTINE_IDENTIFIER_TYPE(IntEnum):
ERASE_MEMORY = 0xFF00
CHECK_PROGRAMMING_DEPENDENCIES = 0xFF01
ERASE_MIRROR_MEMORY_DTCS = 0xFF02
def routine_control(address, routine_control_type, routine_identifier_type, routine_option_record=''):
data = struct.pack('!H', routine_identifier_type) + routine_option_record
_request(address, service=SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
_request(address, SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
resp = resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != routine_identifier_type:
raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id)))
@@ -513,14 +524,14 @@ def request_download(address, memory_address, memory_size, memory_address_bytes=
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data += struct.pack('!BB', memory_size_bytes, memory_address_bytes)
if memory_address >= 1<<(memory_address_bytes*8)
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4-memory_address_bytes:]
if memory_size >= 1<<(memory_size_bytes*8)
if memory_size >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = _request(address, service=SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
resp = _request(address, SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
max_num_bytes_len = ord(resp[0]) >> 4 if len(resp) > 0 else None
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
@@ -538,14 +549,14 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4,
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data += struct.pack('!BB', memory_size_bytes, memory_address_bytes)
if memory_address >= 1<<(memory_address_bytes*8)
if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
data += struct.pack('!I', memory_address)[4-memory_address_bytes:]
if memory_size >= 1<<(memory_size_bytes*8)
if memory_size >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = _request(address, service=SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data)
resp = _request(address, SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data)
max_num_bytes_len = ord(resp[0]) >> 4 if len(resp) > 0 else None
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
@@ -555,16 +566,16 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4,
return max_num_bytes # max number of bytes per transfer data request
def transfer_data(address, block_sequence_count, data=''):
resp = _request(address, service=SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
resp = _request(address, SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
resp_id = ord(resp[0]) if len(resp) > 0 else None
if resp_id != block_sequence_count:
raise ValueError('invalid block_sequence_count: {}'.format(resp_id))
return resp[1:]
def request_transfer_exit(address)
_request(address, service=SERVICE_TYPE.REQUEST_TRANSFER_EXIT, subfunction=None)
def request_transfer_exit(address):
_request(address, SERVICE_TYPE.REQUEST_TRANSFER_EXIT, subfunction=None)
if __name__ == "__main__":
from . import uds
# examples
vin = uds.read_data_by_identifier(0x18da10f1, uds.DATA_IDENTIFIER.VIN)
vin = read_data_by_identifier(0x18da10f1, DATA_IDENTIFIER_TYPE.VIN)
print(vin)