Defining types in return dictionaries (#1923)

* Defining types in return dictionaries

* Correcting indent

* Returning the class instead of dict

* Fixing failing linter

* Fixing whitespace error
This commit is contained in:
MarinkoMagla
2024-08-31 22:01:28 +02:00
committed by GitHub
parent bd6cec3b29
commit a36ca220fa

View File

@@ -2,6 +2,35 @@ import sys
import time
import struct
from enum import IntEnum, Enum
from dataclasses import dataclass
from typing import Optional
@dataclass
class ExchangeStationIdsReturn:
id_length: int
data_type: int
available: int
protected: int
@dataclass
class GetDaqListSizeReturn:
list_size: int
first_pid: int
@dataclass
class GetSessionStatusReturn:
status: int
info: Optional[int]
@dataclass
class DiagnosticServiceReturn:
length: int
type: int
@dataclass
class ActionServiceReturn:
length: int
type: int
class COMMAND_CODE(IntEnum):
CONNECT = 0x01
@@ -140,15 +169,10 @@ class CcpClient():
self._send_cro(COMMAND_CODE.CONNECT, struct.pack("<H", station_addr))
self._recv_dto(0.025)
def exchange_station_ids(self, device_id_info: bytes = b"") -> dict:
def exchange_station_ids(self, device_id_info: bytes = b"") -> ExchangeStationIdsReturn:
self._send_cro(COMMAND_CODE.EXCHANGE_ID, device_id_info)
resp = self._recv_dto(0.025)
return { # TODO: define a type
"id_length": resp[0],
"data_type": resp[1],
"available": resp[2],
"protected": resp[3],
}
return ExchangeStationIdsReturn(id_length=resp[0], data_type=resp[1], available=resp[2], protected=resp[3])
def get_seed(self, resource_mask: int) -> bytes:
if resource_mask > 255:
@@ -211,15 +235,12 @@ class CcpClient():
self._send_cro(COMMAND_CODE.SELECT_CAL_PAGE)
self._recv_dto(0.025)
def get_daq_list_size(self, list_num: int, can_id: int = 0) -> dict:
def get_daq_list_size(self, list_num: int, can_id: int = 0) -> GetDaqListSizeReturn:
if list_num > 255:
raise ValueError("list number must be less than 256")
self._send_cro(COMMAND_CODE.GET_DAQ_SIZE, bytes([list_num, 0]) + struct.pack(f"{self.byte_order.value}I", can_id))
resp = self._recv_dto(0.025)
return { # TODO: define a type
"list_size": resp[0],
"first_pid": resp[1],
}
return GetDaqListSizeReturn(list_size=resp[0], first_pid=resp[1])
def set_daq_list_pointer(self, list_num: int, odt_num: int, element_num: int) -> None:
if list_num > 255:
@@ -266,13 +287,11 @@ class CcpClient():
self._send_cro(COMMAND_CODE.SET_S_STATUS, bytes([status]))
self._recv_dto(0.025)
def get_session_status(self) -> dict:
def get_session_status(self) -> GetSessionStatusReturn:
self._send_cro(COMMAND_CODE.GET_S_STATUS)
resp = self._recv_dto(0.025)
return { # TODO: define a type
"status": resp[0],
"info": resp[2] if resp[1] else None,
}
info = resp[2] if resp[1] else None
return GetSessionStatusReturn(status=resp[0], info=info)
def build_checksum(self, size: int) -> bytes:
self._send_cro(COMMAND_CODE.BUILD_CHKSUM, struct.pack(f"{self.byte_order.value}I", size))
@@ -310,29 +329,23 @@ class CcpClient():
self._send_cro(COMMAND_CODE.MOVE, struct.pack(f"{self.byte_order.value}I", size))
self._recv_dto(0.025)
def diagnostic_service(self, service_num: int, data: bytes = b"") -> dict:
def diagnostic_service(self, service_num: int, data: bytes = b"") -> DiagnosticServiceReturn:
if service_num > 65535:
raise ValueError("service number must be less than 65536")
if len(data) > 4:
raise ValueError("max data size is 4 bytes")
self._send_cro(COMMAND_CODE.DIAG_SERVICE, struct.pack(f"{self.byte_order.value}H", service_num) + data)
resp = self._recv_dto(0.025)
return { # TODO: define a type
"length": resp[0],
"type": resp[1],
}
return DiagnosticServiceReturn(length=resp[0], type=resp[1])
def action_service(self, service_num: int, data: bytes = b"") -> dict:
def action_service(self, service_num: int, data: bytes = b"") -> ActionServiceReturn:
if service_num > 65535:
raise ValueError("service number must be less than 65536")
if len(data) > 4:
raise ValueError("max data size is 4 bytes")
self._send_cro(COMMAND_CODE.ACTION_SERVICE, struct.pack(f"{self.byte_order.value}H", service_num) + data)
resp = self._recv_dto(0.025)
return { # TODO: define a type
"length": resp[0],
"type": resp[1],
}
return ActionServiceReturn(length=resp[0], type=resp[1])
def test_availability(self, station_addr: int) -> None:
if station_addr > 65535: