Merge panda subtree

This commit is contained in:
Vehicle Researcher
2020-05-31 13:22:43 -07:00
60 changed files with 1229 additions and 1932 deletions

View File

@@ -10,12 +10,13 @@ import time
import traceback
import subprocess
import sys
from .dfu import PandaDFU
from .esptool import ESPROM, CesantaFlasher # noqa: F401
from .flash_release import flash_release # noqa: F401
from .update import ensure_st_up_to_date # noqa: F401
from .serial import PandaSerial # noqa: F401
from .isotp import isotp_send, isotp_recv
from .dfu import PandaDFU # pylint: disable=import-error
from .esptool import ESPROM, CesantaFlasher # noqa pylint: disable=import-error
from .flash_release import flash_release # noqa pylint: disable=import-error
from .update import ensure_st_up_to_date # noqa pylint: disable=import-error
from .serial import PandaSerial # noqa pylint: disable=import-error
from .isotp import isotp_send, isotp_recv # pylint: disable=import-error
__version__ = '0.0.9'

View File

@@ -2,7 +2,7 @@
import time
import struct
from collections import deque
from typing import Callable, NamedTuple, Tuple, List
from typing import Callable, NamedTuple, Tuple, List, Deque, Generator, Optional, cast
from enum import IntEnum
class SERVICE_TYPE(IntEnum):
@@ -271,12 +271,12 @@ _negative_response_codes = {
class CanClient():
def __init__(self, can_send: Callable[[Tuple[int, bytes, int]], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addr: int, bus: int, sub_addr: int=None, debug: bool=False):
def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addr: int, bus: int, sub_addr: int=None, debug: bool=False):
self.tx = can_send
self.rx = can_recv
self.tx_addr = tx_addr
self.rx_addr = rx_addr
self.rx_buff = deque()
self.rx_buff = deque() # type: Deque[bytes]
self.sub_addr = sub_addr
self.bus = bus
self.debug = debug
@@ -320,7 +320,7 @@ class CanClient():
if len(msgs) < 254:
return
def recv(self, drain: bool=False) -> List[bytes]:
def recv(self, drain: bool=False) -> Generator[bytes, None, None]:
# buffer rx messages in case two response messages are received at once
# (e.g. response pending and success/failure response)
self._recv_buffer(drain)
@@ -372,7 +372,7 @@ class IsoTpMessage():
self._tx_first_frame()
def _tx_first_frame(self) -> None:
if self.tx_len < 8:
if self.tx_len < self.max_len:
# single frame (send all bytes)
if self.debug: print("ISO-TP: TX - single frame")
msg = (bytes([self.tx_len]) + self.tx_dat).ljust(self.max_len, b"\x00")
@@ -380,10 +380,10 @@ class IsoTpMessage():
else:
# first frame (send first 6 bytes)
if self.debug: print("ISO-TP: TX - first frame")
msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:6]).ljust(self.max_len, b"\x00")
msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:self.max_len - 2]).ljust(self.max_len - 2, b"\x00")
self._can_client.send([msg])
def recv(self) -> bytes:
def recv(self) -> Optional[bytes]:
start_time = time.time()
try:
while True:
@@ -416,7 +416,7 @@ class IsoTpMessage():
self.rx_idx = 0
self.rx_done = False
if self.debug: print(f"ISO-TP: RX - first frame - idx={self.rx_idx} done={self.rx_done}")
if self.debug: print(f"ISO-TP: TX - flow control continue")
if self.debug: print("ISO-TP: TX - flow control continue")
# send flow control message (send all bytes)
msg = b"\x30\x00\x00".ljust(self.max_len, b"\x00")
self._can_client.send([msg])
@@ -505,6 +505,10 @@ class UdsClient():
isotp_msg.send(req)
while True:
resp = isotp_msg.recv()
if resp is None:
continue
resp_sid = resp[0] if len(resp) > 0 else None
# negative response
@@ -518,7 +522,7 @@ class UdsClient():
try:
error_desc = _negative_response_codes[error_code]
except BaseException:
error_desc = resp[3:]
error_desc = resp[3:].hex()
# wait for another message if response pending
if error_code == 0x78:
if self.debug: print("UDS-RX: response pending")
@@ -534,7 +538,7 @@ class UdsClient():
resp_sfn = resp[1] if len(resp) > 1 else None
if subfunction != resp_sfn:
resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None
raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_sfn_hex)))
raise InvalidSubFunctioneError(f'invalid response subfunction: {resp_sfn_hex:x}')
# return data (exclude service id and sub-function id)
return resp[(1 if subfunction is None else 2):]
@@ -595,7 +599,7 @@ class UdsClient():
def response_on_event(self, response_event_type: RESPONSE_EVENT_TYPE, store_event: bool, window_time: int, event_type_record: int, service_response_record: int):
if store_event:
response_event_type |= 0x20
response_event_type |= 0x20 # type: ignore
# TODO: split record parameters into arrays
data = bytes([window_time, event_type_record, service_response_record])
resp = self._uds_request(SERVICE_TYPE.RESPONSE_ON_EVENT, subfunction=response_event_type, data=data)
@@ -613,9 +617,11 @@ class UdsClient():
}
def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: BAUD_RATE_TYPE=None):
data : Optional[bytes]
if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE:
# baud_rate_type = BAUD_RATE_TYPE
data = bytes([baud_rate_type])
data = bytes([cast(int, baud_rate_type)])
elif link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE:
# baud_rate_type = custom value (3 bytes big-endian)
data = struct.pack('!I', baud_rate_type)[1:]
@@ -671,16 +677,16 @@ class UdsClient():
data = struct.pack('!H', dynamic_data_identifier)
if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER:
for s in source_definitions:
data += struct.pack('!H', s["data_identifier"]) + bytes([s["position"], s["memory_size"]])
data += struct.pack('!H', s.data_identifier) + bytes([s.position, s.memory_size])
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS:
data += bytes([memory_size_bytes<<4 | memory_address_bytes])
for s in source_definitions:
if s["memory_address"] >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(s["memory_address"]))
data += struct.pack('!I', s["memory_address"])[4-memory_address_bytes:]
if s["memory_size"] >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(s["memory_size"]))
data += struct.pack('!I', s["memory_size"])[4-memory_size_bytes:]
if s.memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(s.memory_address))
data += struct.pack('!I', s.memory_address)[4-memory_address_bytes:]
if s.memory_size >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(s.memory_size))
data += struct.pack('!I', s.memory_size)[4-memory_size_bytes:]
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER:
pass
else:
@@ -736,7 +742,7 @@ class UdsClient():
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER:
data += ord(dtc_snapshot_record_num)
data += bytes([dtc_snapshot_record_num])
# dtc_extended_record_num
if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER:
@@ -784,7 +790,7 @@ class UdsClient():
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else 0
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', (b"\x00"*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
else:
@@ -809,7 +815,7 @@ class UdsClient():
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]
resp = self._uds_request(SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data)
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else 0
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', (b"\x00"*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
else: