remove more kline usage

This commit is contained in:
Adeeb Shihadeh
2023-11-23 17:29:53 -08:00
parent 666a462450
commit 574d2db10d
3 changed files with 0 additions and 142 deletions

View File

@@ -67,102 +67,6 @@ class ELMCarSimulator():
if self.__can_enable:
self.__can_monitor_thread.start()
#########################
# LIN related functions #
#########################
def __lin_monitor(self):
print("STARTING LIN THREAD")
self.panda.set_uart_baud(2, 10400)
self.panda.kline_drain() # Toss whatever was already there
lin_buff = bytearray()
while not self.__stop:
lin_msg = self.panda.serial_read(2)
if not lin_msg:
continue
lin_buff += lin_msg
#print(" ** Buff", lin_buff)
if lin_buff.endswith(b'\x00\xc1\x33\xf1\x81\x66'): # Leading 0 is wakeup
lin_buff = bytearray()
self.__lin_active = True
print("GOT LIN (KWP FAST) WAKEUP SIGNAL")
self._lin_send(0x10, b'\xC1\x8F\xE9')
self.__reset_lin_timeout()
continue
if self.__lin_active:
msglen = lin_buff[0] & 0x7
if lin_buff[0] & 0xF8 not in (0x80, 0xC0):
print("Invalid bytes at start of message")
print(" BUFF", lin_buff)
continue
if len(lin_buff) < msglen + 4:
continue
if lin_checksum(lin_buff[:-1]) != lin_buff[-1]:
continue
self.__lin_process_msg(lin_buff[0] & 0xF8, # Priority
lin_buff[1], lin_buff[2], lin_buff[3:-1])
lin_buff = bytearray()
def _lin_send(self, to_addr, msg):
if not self.__silent:
print(" LIN Reply (%x)" % to_addr, binascii.hexlify(msg))
PHYS_ADDR = 0x80
#FUNC_ADDR = 0xC0
RECV = 0xF1
#SEND = 0x33 # Car OBD Functional Address
headers = struct.pack("BBB", PHYS_ADDR | len(msg), RECV, to_addr)
if not self.__silent:
print(" Sending LIN", binascii.hexlify(headers + msg),
hex(sum(bytearray(headers + msg)) % 0x100))
self.panda.kline_send(headers + msg)
def __reset_lin_timeout(self):
if self.__lin_timer:
self.__lin_timer.cancel()
self.__lin_timer = threading.Timer(5, self.__lin_timeout_handler)
self.__lin_timer.start()
def __lin_timeout_handler(self):
print("LIN TIMEOUT")
self.__lin_timer = None
self.__lin_active = False
@property
def lin_active(self):
return self.__lin_active
def __lin_process_msg(self, priority, toaddr, fromaddr, data):
self.__reset_lin_timeout()
if not self.__silent and data != b'\x3E':
print("LIN MSG", "Addr:", hex(toaddr), "obdLen:", len(data),
binascii.hexlify(data))
outmsg = None
#if data == b'\x3E':
# print("KEEP ALIVE")
#el
if len(data) > 1:
outmsg = self._process_obd(data[0], data[1])
if outmsg:
obd_header = struct.pack("BB", 0x40 | data[0], data[1])
if len(outmsg) <= 5:
self._lin_send(0x10, obd_header + outmsg)
else:
first_msg_len = min(4, len(outmsg) % 4) or 4
self._lin_send(0x10, obd_header + b'\x01' +
b'\x00' * (4 - first_msg_len) +
outmsg[:first_msg_len])
for num, i in enumerate(range(first_msg_len, len(outmsg), 4)):
self._lin_send(0x10, obd_header +
struct.pack('B', (num + 2) % 0x100) + outmsg[i:i + 4])
#########################
# CAN related functions #
#########################

View File

@@ -4,8 +4,6 @@ import os
import time
import random
import argparse
from hexdump import hexdump
from itertools import permutations
from panda import Panda
@@ -38,27 +36,6 @@ def run_test_w_pandas(pandas, sleep_duration):
# **** test health packet ****
print("health", ho[0], h[ho[0]].health())
# **** test K/L line loopback ****
for bus in [2, 3]:
# flush the output
h[ho[1]].kline_drain(bus=bus)
# send the characters
st = get_test_string()
st = bytes([0xaa, len(st) + 3]) + st
h[ho[0]].kline_send(st, bus=bus, checksum=False)
# check for receive
ret = h[ho[1]].kline_drain(bus=bus)
print("ST Data:")
hexdump(st)
print("RET Data:")
hexdump(ret)
assert st == ret
print("K/L pass", bus, ho, "\n")
time.sleep(sleep_duration)
# **** test can line loopback ****
for bus, gmlan in [(0, False), (1, False), (2, False), (1, True), (2, True)]:
print("\ntest can", bus)

View File

@@ -4,8 +4,6 @@ import os
import time
import random
import argparse
from hexdump import hexdump
from itertools import permutations
from panda import Panda
@@ -38,27 +36,6 @@ def run_test_w_pandas(pandas, sleep_duration):
# **** test health packet ****
print("health", ho[0], h[ho[0]].health())
# **** test K/L line loopback ****
for bus in [2, 3]:
# flush the output
h[ho[1]].kline_drain(bus=bus)
# send the characters
st = get_test_string()
st = bytes([0xaa, len(st) + 3]) + st
h[ho[0]].kline_send(st, bus=bus, checksum=False)
# check for receive
ret = h[ho[1]].kline_drain(bus=bus)
print("ST Data:")
hexdump(st)
print("RET Data:")
hexdump(ret)
assert st == ret
print("K/L pass", bus, ho, "\n")
time.sleep(sleep_duration)
# **** test can line loopback ****
# for bus, gmlan in [(0, None), (1, False), (2, False), (1, True), (2, True)]:
for bus, gmlan in [(0, None), (1, None)]: