diff --git a/tests/elm_car_simulator.py b/tests/elm_car_simulator.py index 27afdbad..119c6b68 100755 --- a/tests/elm_car_simulator.py +++ b/tests/elm_car_simulator.py @@ -67,102 +67,6 @@ class ELMCarSimulator(): if self.__can_enable: self.__can_monitor_thread.start() - ######################### - # LIN related functions # - ######################### - - def __lin_monitor(self): - print("STARTING LIN THREAD") - self.panda.set_uart_baud(2, 10400) - self.panda.kline_drain() # Toss whatever was already there - - lin_buff = bytearray() - - while not self.__stop: - lin_msg = self.panda.serial_read(2) - if not lin_msg: - continue - - lin_buff += lin_msg - #print(" ** Buff", lin_buff) - if lin_buff.endswith(b'\x00\xc1\x33\xf1\x81\x66'): # Leading 0 is wakeup - lin_buff = bytearray() - self.__lin_active = True - print("GOT LIN (KWP FAST) WAKEUP SIGNAL") - self._lin_send(0x10, b'\xC1\x8F\xE9') - self.__reset_lin_timeout() - continue - if self.__lin_active: - msglen = lin_buff[0] & 0x7 - if lin_buff[0] & 0xF8 not in (0x80, 0xC0): - print("Invalid bytes at start of message") - print(" BUFF", lin_buff) - continue - if len(lin_buff) < msglen + 4: - continue - if lin_checksum(lin_buff[:-1]) != lin_buff[-1]: - continue - self.__lin_process_msg(lin_buff[0] & 0xF8, # Priority - lin_buff[1], lin_buff[2], lin_buff[3:-1]) - lin_buff = bytearray() - - def _lin_send(self, to_addr, msg): - if not self.__silent: - print(" LIN Reply (%x)" % to_addr, binascii.hexlify(msg)) - - PHYS_ADDR = 0x80 - #FUNC_ADDR = 0xC0 - RECV = 0xF1 - #SEND = 0x33 # Car OBD Functional Address - headers = struct.pack("BBB", PHYS_ADDR | len(msg), RECV, to_addr) - if not self.__silent: - print(" Sending LIN", binascii.hexlify(headers + msg), - hex(sum(bytearray(headers + msg)) % 0x100)) - self.panda.kline_send(headers + msg) - - def __reset_lin_timeout(self): - if self.__lin_timer: - self.__lin_timer.cancel() - self.__lin_timer = threading.Timer(5, self.__lin_timeout_handler) - self.__lin_timer.start() - - def __lin_timeout_handler(self): - print("LIN TIMEOUT") - self.__lin_timer = None - self.__lin_active = False - - @property - def lin_active(self): - return self.__lin_active - - def __lin_process_msg(self, priority, toaddr, fromaddr, data): - self.__reset_lin_timeout() - - if not self.__silent and data != b'\x3E': - print("LIN MSG", "Addr:", hex(toaddr), "obdLen:", len(data), - binascii.hexlify(data)) - - outmsg = None - #if data == b'\x3E': - # print("KEEP ALIVE") - #el - if len(data) > 1: - outmsg = self._process_obd(data[0], data[1]) - - if outmsg: - obd_header = struct.pack("BB", 0x40 | data[0], data[1]) - if len(outmsg) <= 5: - self._lin_send(0x10, obd_header + outmsg) - else: - first_msg_len = min(4, len(outmsg) % 4) or 4 - self._lin_send(0x10, obd_header + b'\x01' + - b'\x00' * (4 - first_msg_len) + - outmsg[:first_msg_len]) - - for num, i in enumerate(range(first_msg_len, len(outmsg), 4)): - self._lin_send(0x10, obd_header + - struct.pack('B', (num + 2) % 0x100) + outmsg[i:i + 4]) - ######################### # CAN related functions # ######################### diff --git a/tests/loopback_test.py b/tests/loopback_test.py index 73bb91be..d4f8bebf 100755 --- a/tests/loopback_test.py +++ b/tests/loopback_test.py @@ -4,8 +4,6 @@ import os import time import random import argparse - -from hexdump import hexdump from itertools import permutations from panda import Panda @@ -38,27 +36,6 @@ def run_test_w_pandas(pandas, sleep_duration): # **** test health packet **** print("health", ho[0], h[ho[0]].health()) - # **** test K/L line loopback **** - for bus in [2, 3]: - # flush the output - h[ho[1]].kline_drain(bus=bus) - - # send the characters - st = get_test_string() - st = bytes([0xaa, len(st) + 3]) + st - h[ho[0]].kline_send(st, bus=bus, checksum=False) - - # check for receive - ret = h[ho[1]].kline_drain(bus=bus) - - print("ST Data:") - hexdump(st) - print("RET Data:") - hexdump(ret) - assert st == ret - print("K/L pass", bus, ho, "\n") - time.sleep(sleep_duration) - # **** test can line loopback **** for bus, gmlan in [(0, False), (1, False), (2, False), (1, True), (2, True)]: print("\ntest can", bus) diff --git a/tests/tucan_loopback.py b/tests/tucan_loopback.py index 1bf1254a..457facdf 100755 --- a/tests/tucan_loopback.py +++ b/tests/tucan_loopback.py @@ -4,8 +4,6 @@ import os import time import random import argparse - -from hexdump import hexdump from itertools import permutations from panda import Panda @@ -38,27 +36,6 @@ def run_test_w_pandas(pandas, sleep_duration): # **** test health packet **** print("health", ho[0], h[ho[0]].health()) - # **** test K/L line loopback **** - for bus in [2, 3]: - # flush the output - h[ho[1]].kline_drain(bus=bus) - - # send the characters - st = get_test_string() - st = bytes([0xaa, len(st) + 3]) + st - h[ho[0]].kline_send(st, bus=bus, checksum=False) - - # check for receive - ret = h[ho[1]].kline_drain(bus=bus) - - print("ST Data:") - hexdump(st) - print("RET Data:") - hexdump(ret) - assert st == ret - print("K/L pass", bus, ho, "\n") - time.sleep(sleep_duration) - # **** test can line loopback **** # for bus, gmlan in [(0, None), (1, False), (2, False), (1, True), (2, True)]: for bus, gmlan in [(0, None), (1, None)]: